Skip to content

Commit 00e71e9

Browse files
committed
all
1 parent f17f1bb commit 00e71e9

9 files changed

Lines changed: 217 additions & 103 deletions

File tree

be/src/exec/exchange/vdata_stream_sender.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ class Channel {
164164

165165
std::shared_ptr<ExchangeSendCallback<PTransmitDataResult>> get_send_callback(RpcInstance* ins,
166166
bool eos) {
167+
// here we reuse the callback because it's re-construction may be expensive due to many parameters' capture
167168
if (!_send_callback) {
168169
_send_callback = ExchangeSendCallback<PTransmitDataResult>::create_shared();
169170
} else {

be/src/exec/operator/exchange_sink_buffer.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) {
347347
}
348348
// The eos here only indicates that the current exchange sink has reached eos.
349349
// However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent.
350+
// `_send_rpc` must be the LAST operation in this function, because it may reuse the callback!
350351
s = _send_rpc(ins);
351352
if (!s) {
352353
_failed(ins.id,
@@ -473,9 +474,9 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) {
473474
} else if (eos) {
474475
_ended(ins);
475476
}
476-
477477
// The eos here only indicates that the current exchange sink has reached eos.
478478
// However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent.
479+
// `_send_rpc` must be the LAST operation in this function, because it may reuse the callback!
479480
s = _send_rpc(ins);
480481
if (!s) {
481482
_failed(ins.id,

be/src/exec/runtime_filter/runtime_filter.cpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,7 @@ Status RuntimeFilter::_push_to_remote(RuntimeState* state, const TNetworkAddress
3939
auto merge_filter_callback = DummyBrpcCallback<PMergeFilterResponse>::create_shared();
4040
auto merge_filter_closure =
4141
AutoReleaseClosure<PMergeFilterRequest, DummyBrpcCallback<PMergeFilterResponse>>::
42-
create_unique(merge_filter_request, merge_filter_callback,
43-
state->query_options().ignore_runtime_filter_error
44-
? std::weak_ptr<QueryContext> {}
45-
: state->get_query_ctx_weak());
42+
create_unique(merge_filter_request, merge_filter_callback);
4643
void* data = nullptr;
4744
int len = 0;
4845

be/src/exec/runtime_filter/runtime_filter_mgr.cpp

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -259,8 +259,6 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr<Quer
259259
Status st = Status::OK();
260260
// After all runtime filters' size are collected, we should send response to all producers.
261261
if (cnt_val.merger->add_rf_size(request->filter_size())) {
262-
auto ctx = query_ctx->ignore_runtime_filter_error() ? std::weak_ptr<QueryContext> {}
263-
: query_ctx;
264262
for (auto addr : cnt_val.source_addrs) {
265263
std::shared_ptr<PBackendService_Stub> stub(
266264
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr));
@@ -277,7 +275,7 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr<Quer
277275
auto closure = AutoReleaseClosure<PSyncFilterSizeRequest,
278276
DummyBrpcCallback<PSyncFilterSizeResponse>>::
279277
create_unique(sync_request,
280-
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared(), ctx);
278+
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared());
281279

282280
auto* pquery_id = closure->request_->mutable_query_id();
283281
pquery_id->set_hi(query_ctx->query_id().hi);
@@ -376,17 +374,13 @@ Status RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryContext> q
376374
}
377375

378376
if (is_ready) {
379-
return _send_rf_to_target(cnt_val,
380-
query_ctx->ignore_runtime_filter_error()
381-
? std::weak_ptr<QueryContext> {}
382-
: query_ctx,
383-
merge_time, request->query_id(), query_ctx->execution_timeout());
377+
return _send_rf_to_target(cnt_val, merge_time, request->query_id(),
378+
query_ctx->execution_timeout());
384379
}
385380
return Status::OK();
386381
}
387382

388383
Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext& cnt_val,
389-
std::weak_ptr<QueryContext> ctx,
390384
int64_t merge_time,
391385
PUniqueId query_id,
392386
int execution_timeout) {
@@ -429,7 +423,7 @@ Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext
429423
auto closure = AutoReleaseClosure<PPublishFilterRequestV2,
430424
DummyBrpcCallback<PPublishFilterResponse>>::
431425
create_unique(std::make_shared<PPublishFilterRequestV2>(apply_request),
432-
DummyBrpcCallback<PPublishFilterResponse>::create_shared(), ctx);
426+
DummyBrpcCallback<PPublishFilterResponse>::create_shared());
433427

434428
closure->request_->set_merge_time(merge_time);
435429
*closure->request_->mutable_query_id() = query_id;

be/src/exec/runtime_filter/runtime_filter_mgr.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,8 @@ class RuntimeFilterMergeControllerEntity {
182182
const std::vector<TRuntimeFilterTargetParamsV2>&& target_info,
183183
const int producer_size);
184184

185-
Status _send_rf_to_target(GlobalMergeContext& cnt_val, std::weak_ptr<QueryContext> ctx,
186-
int64_t merge_time, PUniqueId query_id, int execution_timeout);
185+
Status _send_rf_to_target(GlobalMergeContext& cnt_val, int64_t merge_time, PUniqueId query_id,
186+
int execution_timeout);
187187

188188
// protect _filter_map
189189
std::shared_mutex _filter_map_mutex;

be/src/exec/runtime_filter/runtime_filter_producer.cpp

Lines changed: 48 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -98,51 +98,55 @@ Status RuntimeFilterProducer::publish(RuntimeState* state, bool build_hash_table
9898
return Status::OK();
9999
}
100100

101-
class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest,
102-
DummyBrpcCallback<PSendFilterSizeResponse>> {
103-
std::shared_ptr<Dependency> _dependency;
104-
// Should use weak ptr here, because when query context deconstructs, should also delete runtime filter
105-
// context, it not the memory is not released. And rpc is in another thread, it will hold rf context
106-
// after query context because the rpc is not returned.
107-
std::weak_ptr<RuntimeFilterWrapper> _wrapper;
108-
using Base =
109-
AutoReleaseClosure<PSendFilterSizeRequest, DummyBrpcCallback<PSendFilterSizeResponse>>;
110-
friend class RuntimeFilterProducer;
111-
ENABLE_FACTORY_CREATOR(SyncSizeClosure);
101+
// Callback for sync-size RPCs. Handles errors (disable wrapper + sub dependency) in call().
102+
class SyncSizeCallback : public DummyBrpcCallback<PSendFilterSizeResponse> {
103+
ENABLE_FACTORY_CREATOR(SyncSizeCallback);
104+
105+
public:
106+
SyncSizeCallback(std::shared_ptr<Dependency> dependency,
107+
std::shared_ptr<RuntimeFilterWrapper> wrapper,
108+
std::weak_ptr<QueryContext> context)
109+
: _dependency(std::move(dependency)), _wrapper(wrapper), _context(std::move(context)) {}
112110

113-
void _process_if_rpc_failed() override {
114-
Defer defer {[&]() {
115-
Base::_process_if_rpc_failed();
111+
void call() override {
112+
// On error: disable the wrapper and sub the dependency here, because set_synced_size()
113+
// will never be called (the merge node won't respond with a sync).
114+
// On success: do NOT sub here. The merge node will respond with sync_filter_size,
115+
// which calls set_synced_size() -> _dependency->sub().
116+
if (cntl_->Failed()) {
117+
if (auto w = _wrapper.lock()) {
118+
w->set_state(RuntimeFilterWrapper::State::DISABLED, cntl_->ErrorText());
119+
}
120+
if (auto ctx = _context.lock()) {
121+
if (!ctx->ignore_runtime_filter_error()) {
122+
ctx->cancel(Status::NetworkError("RPC meet failed: {}", cntl_->ErrorText()));
123+
}
124+
}
116125
((CountedFinishDependency*)_dependency.get())->sub();
117-
}};
118-
auto wrapper = _wrapper.lock();
119-
if (!wrapper) {
120126
return;
121127
}
122128

123-
wrapper->set_state(RuntimeFilterWrapper::State::DISABLED, cntl_->ErrorText());
124-
}
125-
126-
void _process_if_meet_error_status(const Status& status) override {
127-
Defer defer {[&]() {
128-
Base::_process_if_meet_error_status(status);
129+
Status status = Status::create(response_->status());
130+
if (!status.ok()) {
131+
if (auto w = _wrapper.lock()) {
132+
w->set_state(RuntimeFilterWrapper::State::DISABLED, status.to_string());
133+
}
134+
if (auto ctx = _context.lock()) {
135+
if (!ctx->ignore_runtime_filter_error()) {
136+
ctx->cancel(Status::NetworkError("RPC meet failed: {}", status.to_string()));
137+
}
138+
}
129139
((CountedFinishDependency*)_dependency.get())->sub();
130-
}};
131-
auto wrapper = _wrapper.lock();
132-
if (!wrapper) {
133-
return;
134140
}
135-
136-
wrapper->set_state(RuntimeFilterWrapper::State::DISABLED, status.to_string());
137141
}
138142

139-
public:
140-
SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req,
141-
std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback,
142-
std::shared_ptr<Dependency> dependency,
143-
std::shared_ptr<RuntimeFilterWrapper> wrapper,
144-
std::weak_ptr<QueryContext> context)
145-
: Base(req, callback, context), _dependency(std::move(dependency)), _wrapper(wrapper) {}
143+
private:
144+
std::shared_ptr<Dependency> _dependency;
145+
// Should use weak ptr here, because when query context deconstructs, should also delete runtime filter
146+
// context, it not the memory is not released. And rpc is in another thread, it will hold rf context
147+
// after query context because the rpc is not returned.
148+
std::weak_ptr<RuntimeFilterWrapper> _wrapper;
149+
std::weak_ptr<QueryContext> _context;
146150
};
147151

148152
void RuntimeFilterProducer::latch_dependency(
@@ -208,14 +212,16 @@ Status RuntimeFilterProducer::send_size(RuntimeState* state, uint64_t local_filt
208212

209213
auto request = std::make_shared<PSendFilterSizeRequest>();
210214
request->set_stage(_stage);
211-
212-
auto callback = DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
215+
auto callback = SyncSizeCallback::create_shared(_dependency, _wrapper,
216+
state->get_query_ctx()->weak_from_this());
217+
// Store callback in the producer to keep it alive until the RPC completes.
218+
// AutoReleaseClosure holds callbacks via weak_ptr, so without this the callback
219+
// would be destroyed when this function returns and error-path sub() would never fire.
220+
_sync_size_callback = callback;
213221
// RuntimeFilter maybe deconstructed before the rpc finished, so that could not use
214222
// a raw pointer in closure. Has to use the context's shared ptr.
215-
auto closure = SyncSizeClosure::create_unique(request, callback, _dependency, _wrapper,
216-
state->query_options().ignore_runtime_filter_error
217-
? std::weak_ptr<QueryContext> {}
218-
: state->get_query_ctx_weak());
223+
auto closure = AutoReleaseClosure<PSendFilterSizeRequest, SyncSizeCallback>::create_unique(
224+
request, callback);
219225
auto* pquery_id = request->mutable_query_id();
220226
pquery_id->set_hi(state->get_query_ctx()->query_id().hi);
221227
pquery_id->set_lo(state->get_query_ctx()->query_id().lo);

be/src/exec/runtime_filter/runtime_filter_producer.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,11 @@ class RuntimeFilterProducer : public RuntimeFilter {
181181

182182
int64_t _synced_size = -1;
183183
std::shared_ptr<CountedFinishDependency> _dependency;
184+
// Holds the SyncSizeCallback alive until the send_filter_size RPC completes.
185+
// AutoReleaseClosure stores callbacks via weak_ptr, so without this the callback
186+
// would be destroyed when send_size() returns, and error-path sub() would never fire.
187+
// Type-erased because the callback type is defined in the .cpp file.
188+
std::shared_ptr<void> _sync_size_callback;
184189

185190
std::atomic<State> _rf_state;
186191
};

be/src/util/brpc_closure.h

Lines changed: 20 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@
1919

2020
#include <google/protobuf/stubs/common.h>
2121

22-
#include <atomic>
2322
#include <utility>
2423

25-
#include "runtime/query_context.h"
2624
#include "runtime/thread_context.h"
2725
#include "service/brpc.h"
2826

@@ -84,9 +82,8 @@ class AutoReleaseClosure : public google::protobuf::Closure {
8482
ENABLE_FACTORY_CREATOR(AutoReleaseClosure);
8583

8684
public:
87-
AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback> callback,
88-
std::weak_ptr<QueryContext> context = {}, std::string_view error_msg = {})
89-
: request_(req), callback_(callback), context_(std::move(context)) {
85+
AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback> callback)
86+
: request_(req), callback_(callback) {
9087
this->cntl_ = callback->cntl_;
9188
this->response_ = callback->response_;
9289
}
@@ -96,15 +93,15 @@ class AutoReleaseClosure : public google::protobuf::Closure {
9693
// Will delete itself
9794
void Run() override {
9895
Defer defer {[&]() { delete this; }};
99-
// If lock failed, it means the callback object is deconstructed, then no need
100-
// to deal with the callback any more.
101-
if (auto tmp = callback_.lock()) {
102-
tmp->call();
103-
}
96+
// shouldn't do heavy work here. all heavy work should be done in callback's call() (which means in success/failure handlers)
10497
if (cntl_->Failed()) {
105-
_process_if_rpc_failed();
98+
LOG(WARNING) << "brpc failed: " << cntl_->ErrorText();
10699
} else {
107-
_process_status<ResponseType>(response_.get());
100+
_log_error_status<ResponseType>(response_.get());
101+
}
102+
// this must be the LAST operation in this function, because call() may reuse the callback! (response_ is in callback_)
103+
if (auto tmp = callback_.lock()) {
104+
tmp->call();
108105
}
109106
}
110107

@@ -116,45 +113,24 @@ class AutoReleaseClosure : public google::protobuf::Closure {
116113
// at any stage.
117114
std::shared_ptr<Request> request_;
118115
std::shared_ptr<ResponseType> response_;
119-
std::string error_msg_;
120-
121-
protected:
122-
virtual void _process_if_rpc_failed() {
123-
std::string error_msg =
124-
fmt::format("RPC meet failed: {} {}", cntl_->ErrorText(), error_msg_);
125-
if (auto ctx = context_.lock(); ctx) {
126-
ctx->cancel(Status::NetworkError(error_msg));
127-
} else {
128-
LOG(WARNING) << error_msg;
129-
}
130-
}
131-
132-
virtual void _process_if_meet_error_status(const Status& status) {
133-
if (status.is<ErrorCode::END_OF_FILE>()) {
134-
// no need to log END_OF_FILE, reduce the unlessful log
135-
return;
136-
}
137-
if (auto ctx = context_.lock(); ctx) {
138-
ctx->cancel(status);
139-
} else {
140-
LOG(WARNING) << "RPC meet error status: " << status;
141-
}
142-
}
143116

144117
private:
145-
template <typename Response>
146-
void _process_status(Response* response) {}
147-
148118
template <HasStatus Response>
149-
void _process_status(Response* response) {
119+
void _log_error_status(Response* response) {
150120
if (Status status = Status::create(response->status()); !status.ok()) {
151-
_process_if_meet_error_status(status);
121+
if (!status.is<ErrorCode::END_OF_FILE>()) {
122+
LOG(WARNING) << "RPC meet error status: " << status;
123+
}
152124
}
153125
}
154-
// Use a weak ptr to keep the callback, so that the callback can be deleted if the main
155-
// thread is freed.
126+
127+
template <typename Response>
128+
requires(!HasStatus<Response>)
129+
void _log_error_status(Response* /*response*/) {
130+
// Response type has no status() method, nothing to log.
131+
}
132+
// Use a weak ptr to keep the callback, so that the callback can be deleted if the main thread is freed.
156133
Weak callback_;
157-
std::weak_ptr<QueryContext> context_;
158134
};
159135

160136
} // namespace doris

0 commit comments

Comments
 (0)