Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions be/src/exec/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2379,9 +2379,8 @@ std::string PipelineFragmentContext::debug_string() {
return fmt::to_string(debug_string_buffer);
}

std::vector<std::shared_ptr<TRuntimeProfileTree>>
PipelineFragmentContext::collect_realtime_profile() const {
std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
std::vector<TProfileNodeReport> PipelineFragmentContext::collect_realtime_profile() const {
std::vector<TProfileNodeReport> res;

// we do not have mutex to protect pipeline_id_to_profile
// so we need to make sure this funciton is invoked after fragment context
Expand All @@ -2394,16 +2393,26 @@ PipelineFragmentContext::collect_realtime_profile() const {
return res;
}

// Make sure first profile is fragment level profile
auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
_fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level());
res.push_back(fragment_profile);
TProfileNodeReport fragment_report;
TRuntimeProfileTree fragment_profile;
_fragment_level_profile->to_thrift(&fragment_profile, _runtime_state->profile_level());
fragment_report.__isset.profile = true;
fragment_report.profile = std::move(fragment_profile);
fragment_report.__set_profile_node_type(TProfileNodeType::FRAGMENT_LEVEL);
res.push_back(std::move(fragment_report));

// pipeline_id_to_profile is initialized in prepare stage
int32_t pipeline_id = 0;
for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level());
res.push_back(profile_ptr);
TProfileNodeReport pipeline_report;
TRuntimeProfileTree pipeline_profile_tree;
pipeline_profile->to_thrift(&pipeline_profile_tree, _runtime_state->profile_level());
pipeline_report.__isset.profile = true;
pipeline_report.profile = std::move(pipeline_profile_tree);
pipeline_report.__set_profile_node_type(TProfileNodeType::PIPELINE_LEVEL);
pipeline_report.__set_pipeline_id(pipeline_id);
res.push_back(std::move(pipeline_report));
pipeline_id++;
}

return res;
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include <brpc/closure_guard.h>
#include <gen_cpp/FrontendService_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/types.pb.h>

Expand Down Expand Up @@ -61,7 +62,7 @@ class PipelineFragmentContext : public TaskExecutionContext {

void print_profile(const std::string& extra_info);

std::vector<std::shared_ptr<TRuntimeProfileTree>> collect_realtime_profile() const;
std::vector<TProfileNodeReport> collect_realtime_profile() const;
std::shared_ptr<TRuntimeProfileTree> collect_realtime_load_channel_profile() const;

bool is_timeout(timespec now) const;
Expand Down
34 changes: 17 additions & 17 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,10 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr& wg) {
return Status::OK();
}

void QueryContext::add_fragment_profile(
int fragment_id, const std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profiles,
std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
if (pipeline_profiles.empty()) {
void QueryContext::add_fragment_profile(int fragment_id,
std::vector<TProfileNodeReport> profile_node_reports,
std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
if (profile_node_reports.empty()) {
std::string msg = fmt::format("Add pipeline profile failed, query {}, fragment {}",
print_id(this->_query_id), fragment_id);
LOG_ERROR(msg);
Expand All @@ -414,18 +414,19 @@ void QueryContext::add_fragment_profile(
}

#ifndef NDEBUG
for (const auto& p : pipeline_profiles) {
DCHECK(p != nullptr) << fmt::format("Add pipeline profile failed, query {}, fragment {}",
print_id(this->_query_id), fragment_id);
for (const auto& report : profile_node_reports) {
DCHECK(report.__isset.profile)
<< fmt::format("Add pipeline profile failed, query {}, fragment {}",
print_id(this->_query_id), fragment_id);
}
#endif

std::lock_guard<std::mutex> l(_profile_mutex);
VLOG_ROW << fmt::format(
"Query add fragment profile, query {}, fragment {}, pipeline profile count {} ",
print_id(this->_query_id), fragment_id, pipeline_profiles.size());
print_id(this->_query_id), fragment_id, profile_node_reports.size());

_profile_map.insert(std::make_pair(fragment_id, pipeline_profiles));
_profile_map.insert(std::make_pair(fragment_id, std::move(profile_node_reports)));

if (load_channel_profile != nullptr) {
_load_channel_profile_map.insert(std::make_pair(fragment_id, load_channel_profile));
Expand All @@ -443,15 +444,15 @@ void QueryContext::_report_query_profile() {
}

ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_fragment_profile(
_query_id, this->coord_addr, fragment_id, fragment_profile, load_channel_profile);
_query_id, this->coord_addr, fragment_id, std::move(fragment_profile),
load_channel_profile);
}

ExecEnv::GetInstance()->runtime_query_statistics_mgr()->trigger_profile_reporting();
}

std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
QueryContext::_collect_realtime_query_profile() {
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> res;
void QueryContext::_collect_realtime_query_profile(
std::unordered_map<int, std::vector<TProfileNodeReport>>& profile_node_reports) {
std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
for (const auto& [fragment_id, fragment_ctx_wptr] : _fragment_id_to_pipeline_ctx) {
if (auto fragment_ctx = fragment_ctx_wptr.lock()) {
Expand All @@ -475,17 +476,16 @@ QueryContext::_collect_realtime_query_profile() {
continue;
}

res.insert(std::make_pair(fragment_id, profile));
profile_node_reports.insert(std::make_pair(fragment_id, std::move(profile)));
}
}

return res;
}

TReportExecStatusParams QueryContext::get_realtime_exec_status() {
TReportExecStatusParams exec_status;

auto realtime_query_profile = _collect_realtime_query_profile();
std::unordered_map<int, std::vector<TProfileNodeReport>> realtime_query_profile;
_collect_realtime_query_profile(realtime_query_profile);
std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles;

for (auto load_channel_profile : _load_channel_profile_map) {
Expand Down
13 changes: 6 additions & 7 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include <gen_cpp/FrontendService_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/RuntimeProfile_types.h>
#include <gen_cpp/Types_types.h>
Expand Down Expand Up @@ -376,15 +377,15 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
// PipelineTask 3
// Operator 3
// fragment_id -> list<profile>
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> _profile_map;
std::unordered_map<int, std::vector<TProfileNodeReport>> _profile_map;
std::unordered_map<int, std::shared_ptr<TRuntimeProfileTree>> _load_channel_profile_map;

std::shared_ptr<std::map<std::string, TAIResource>> _ai_resources;

void _report_query_profile();

std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
_collect_realtime_query_profile();
void _collect_realtime_query_profile(
std::unordered_map<int, std::vector<TProfileNodeReport>>& profile_node_reports);

std::mutex _error_url_lock;
std::string _load_error_url;
Expand All @@ -401,10 +402,8 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {

public:
// when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile
void add_fragment_profile(
int fragment_id,
const std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profile,
std::shared_ptr<TRuntimeProfileTree> load_channel_profile);
void add_fragment_profile(int fragment_id, std::vector<TProfileNodeReport> profile_node_reports,
std::shared_ptr<TRuntimeProfileTree> load_channel_profile);

TReportExecStatusParams get_realtime_exec_status();

Expand Down
78 changes: 36 additions & 42 deletions be/src/runtime/runtime_query_statistics_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,13 @@ static Status _do_report_exec_stats_rpc(const TNetworkAddress& coor_addr,
}

static void _report_query_profiles_function(
std::unordered_map<
TUniqueId,
std::tuple<
TNetworkAddress,
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>>>
std::unordered_map<TUniqueId,
std::tuple<TNetworkAddress,
std::unordered_map<int, std::vector<TProfileNodeReport>>>>
profile_copy,
std::unordered_map<std::pair<TUniqueId, int32_t>, std::shared_ptr<TRuntimeProfileTree>>
load_channel_profile_copy) {
// query_id -> {coordinator_addr, {fragment_id -> std::vectpr<pipeline_profile>}}
// query_id -> {coordinator_addr, {fragment_id -> list<profile_node_report>}}
for (auto& entry : profile_copy) {
const auto& query_id = entry.first;
const auto& coor_addr = std::get<0>(entry.second);
Expand Down Expand Up @@ -168,46 +166,37 @@ static void _report_query_profiles_function(

TReportExecStatusParams RuntimeQueryStatisticsMgr::create_report_exec_status_params(
const TUniqueId& query_id,
std::unordered_map<int32_t, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
fragment_id_to_profile,
std::unordered_map<int32_t, std::vector<TProfileNodeReport>>
fragment_id_to_profile_node_reports,
std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles, bool is_done) {
// This function will clear the data of fragment_id_to_profile and load_channel_profiles.
// This function will clear the data of fragment_id_to_profile_node_reports and
// load_channel_profiles.
TQueryProfile profile;
profile.__set_query_id(query_id);

std::map<int32_t, std::vector<TDetailedReportParams>> fragment_id_to_profile_req;
std::map<int32_t, std::vector<TProfileNodeReport>> fragment_id_to_profile_node_reports_req;

for (const auto& entry : fragment_id_to_profile) {
for (auto& entry : fragment_id_to_profile_node_reports) {
int32_t fragment_id = entry.first;
const std::vector<std::shared_ptr<TRuntimeProfileTree>>& fragment_profile = entry.second;
std::vector<TDetailedReportParams> detailed_params;
bool is_first = true;
for (auto pipeline_profile : fragment_profile) {
if (pipeline_profile == nullptr) {
std::vector<TProfileNodeReport>& profile_node_reports = entry.second;
for (const auto& profile_node_report : profile_node_reports) {
if (!profile_node_report.__isset.profile ||
!profile_node_report.__isset.profile_node_type) {
auto msg = fmt::format("Register fragment profile {} {} failed, profile is null",
print_id(query_id), fragment_id);
DCHECK(false) << msg;
LOG_ERROR(msg);
continue;
}

TDetailedReportParams tmp;
THRIFT_MOVE_VALUES(tmp, profile, *pipeline_profile);
// First profile is fragment level
tmp.__set_is_fragment_level(is_first);
is_first = false;
// tmp.fragment_instance_id is not needed for pipeline x
detailed_params.push_back(std::move(tmp));
}

fragment_id_to_profile_req[fragment_id] = std::move(detailed_params);
fragment_id_to_profile_node_reports_req[fragment_id] = std::move(profile_node_reports);
}

if (fragment_id_to_profile_req.empty()) {
if (fragment_id_to_profile_node_reports_req.empty()) {
LOG_WARNING("No fragment profile found for query {}", print_id(query_id));
}

profile.__set_fragment_id_to_profile(fragment_id_to_profile_req);
profile.__set_fragment_id_to_profile_node_reports(fragment_id_to_profile_node_reports_req);

std::vector<TRuntimeProfileTree> load_channel_profiles_req;
for (auto load_channel_profile : load_channel_profiles) {
Expand All @@ -230,7 +219,9 @@ TReportExecStatusParams RuntimeQueryStatisticsMgr::create_report_exec_status_par
TReportExecStatusParams req;
THRIFT_MOVE_VALUES(req, query_profile, profile);
req.__set_backend_id(ExecEnv::GetInstance()->cluster_info()->backend_id);
// invalid query id to avoid API compatibility during upgrade
// This RPC is profile-only: the real query id is carried in query_profile.query_id.
// Keep the top-level query id invalid so old FEs do not route this status-less
// report into updateFragmentExecStatus(), which expects params.status.
req.__set_query_id(TUniqueId());
req.__set_done(is_done);

Expand Down Expand Up @@ -265,10 +256,12 @@ void RuntimeQueryStatisticsMgr::trigger_profile_reporting() {
_load_channel_profile_map.swap(load_channel_profile_copy);
}

// ATTN: Local variables are copied to avoid memory reclamation issues.
auto st = _thread_pool->submit_func([profile_copy, load_channel_profile_copy]() {
_report_query_profiles_function(profile_copy, load_channel_profile_copy);
});
auto st = _thread_pool->submit_func(
[profile_copy = std::move(profile_copy),
load_channel_profile_copy = std::move(load_channel_profile_copy)]() mutable {
_report_query_profiles_function(std::move(profile_copy),
std::move(load_channel_profile_copy));
});

if (!st.ok()) {
LOG_WARNING("Failed to submit profile reporting task, reason: {}", st.to_string());
Expand All @@ -290,10 +283,11 @@ void RuntimeQueryStatisticsMgr::stop_report_thread() {

void RuntimeQueryStatisticsMgr::register_fragment_profile(
const TUniqueId& query_id, const TNetworkAddress& coor_addr, int32_t fragment_id,
std::vector<std::shared_ptr<TRuntimeProfileTree>> p_profiles,
std::vector<TProfileNodeReport> profile_node_reports,
std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
for (const auto& p : p_profiles) {
if (p == nullptr) {
for (const auto& profile_node_report : profile_node_reports) {
if (!profile_node_report.__isset.profile ||
!profile_node_report.__isset.profile_node_type) {
auto msg = fmt::format("Register fragment profile {} {} failed, profile is null",
print_id(query_id), fragment_id);
DCHECK(false) << msg;
Expand All @@ -306,20 +300,20 @@ void RuntimeQueryStatisticsMgr::register_fragment_profile(

if (!_profile_map.contains(query_id)) {
_profile_map[query_id] = std::make_tuple(
coor_addr,
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>());
coor_addr, std::unordered_map<int, std::vector<TProfileNodeReport>>());
}

std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>&
fragment_profile_map = std::get<1>(_profile_map[query_id]);
fragment_profile_map.insert(std::make_pair(fragment_id, p_profiles));
std::unordered_map<int, std::vector<TProfileNodeReport>>& fragment_profile_map =
std::get<1>(_profile_map[query_id]);
auto profile_node_report_size = profile_node_reports.size();
fragment_profile_map.insert(std::make_pair(fragment_id, std::move(profile_node_reports)));

if (load_channel_profile != nullptr) {
_load_channel_profile_map[std::make_pair(query_id, fragment_id)] = load_channel_profile;
}

VLOG_CRITICAL << fmt::format("register x profile done {}, fragment {}, profiles {}",
print_id(query_id), fragment_id, p_profiles.size());
print_id(query_id), fragment_id, profile_node_report_size);
}

void RuntimeQueryStatisticsMgr::register_resource_context(
Expand Down
12 changes: 6 additions & 6 deletions be/src/runtime/runtime_query_statistics_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include <gen_cpp/Data_types.h>
#include <gen_cpp/FrontendService_types.h>
#include <gen_cpp/RuntimeProfile_types.h>
#include <gen_cpp/Types_types.h>

Expand All @@ -42,8 +43,8 @@ class RuntimeQueryStatisticsMgr {

static TReportExecStatusParams create_report_exec_status_params(
const TUniqueId& q_id,
std::unordered_map<int32_t, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
fragment_id_to_profile,
std::unordered_map<int32_t, std::vector<TProfileNodeReport>>
fragment_id_to_profile_node_reports,
std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profile, bool is_done);

void register_resource_context(std::string query_id,
Expand All @@ -65,7 +66,7 @@ class RuntimeQueryStatisticsMgr {

void register_fragment_profile(const TUniqueId& query_id, const TNetworkAddress& const_addr,
int32_t fragment_id,
std::vector<std::shared_ptr<TRuntimeProfileTree>> p_profiles,
std::vector<TProfileNodeReport> profile_node_reports,
std::shared_ptr<TRuntimeProfileTree> load_channel_profile_x);
// When query is finished, try to report query profiles to FE.
// ATTN: Profile is reported to fe fragment by fragment.
Expand All @@ -81,11 +82,10 @@ class RuntimeQueryStatisticsMgr {
std::atomic_bool started = false;
std::mutex _profile_map_lock;

// query_id -> {coordinator_addr, {fragment_id -> std::vector<pipeline_profile>}}
// query_id -> {coordinator_addr, {fragment_id -> std::vector<profile_node_report>}}
std::unordered_map<
TUniqueId,
std::tuple<TNetworkAddress,
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>>>
std::tuple<TNetworkAddress, std::unordered_map<int, std::vector<TProfileNodeReport>>>>
_profile_map;

std::unordered_map<std::pair<TUniqueId, int32_t>, std::shared_ptr<TRuntimeProfileTree>>
Expand Down
Loading
Loading