Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/exec/exchange/vdata_stream_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ class Channel {

std::shared_ptr<ExchangeSendCallback<PTransmitDataResult>> get_send_callback(RpcInstance* ins,
bool eos) {
// here we reuse the callback because it's re-construction may be expensive due to many parameters' capture
if (!_send_callback) {
_send_callback = ExchangeSendCallback<PTransmitDataResult>::create_shared();
} else {
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/operator/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) {
}
// The eos here only indicates that the current exchange sink has reached eos.
// However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent.
// `_send_rpc` must be the LAST operation in this function, because it may reuse the callback!
s = _send_rpc(ins);
if (!s) {
_failed(ins.id,
Expand Down Expand Up @@ -473,9 +474,9 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) {
} else if (eos) {
_ended(ins);
}

// The eos here only indicates that the current exchange sink has reached eos.
// However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent.
// `_send_rpc` must be the LAST operation in this function, because it may reuse the callback!
s = _send_rpc(ins);
if (!s) {
_failed(ins.id,
Expand Down
5 changes: 1 addition & 4 deletions be/src/exec/runtime_filter/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ Status RuntimeFilter::_push_to_remote(RuntimeState* state, const TNetworkAddress
auto merge_filter_callback = DummyBrpcCallback<PMergeFilterResponse>::create_shared();
auto merge_filter_closure =
AutoReleaseClosure<PMergeFilterRequest, DummyBrpcCallback<PMergeFilterResponse>>::
create_unique(merge_filter_request, merge_filter_callback,
state->query_options().ignore_runtime_filter_error
? std::weak_ptr<QueryContext> {}
: state->get_query_ctx_weak());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Behavioral change] _push_to_remote sends a merge_filter RPC with a DummyBrpcCallback (empty call()). Previously, when ignore_runtime_filter_error == false, the old code would cancel the query context on RPC failure. Now, failures are only logged as warnings.

This is functionally safe (runtime filters are an optimization, not correctness), but the behavioral change should be documented in the PR's release notes. A merge_filter failure will result in the runtime filter never completing at the merge node — consumers will wait until the RF timeout expires, then proceed without the filter.

create_unique(merge_filter_request, merge_filter_callback);
void* data = nullptr;
int len = 0;

Expand Down
14 changes: 4 additions & 10 deletions be/src/exec/runtime_filter/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,6 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr<Quer
Status st = Status::OK();
// After all runtime filters' size are collected, we should send response to all producers.
if (cnt_val.merger->add_rf_size(request->filter_size())) {
auto ctx = query_ctx->ignore_runtime_filter_error() ? std::weak_ptr<QueryContext> {}
: query_ctx;
for (auto addr : cnt_val.source_addrs) {
std::shared_ptr<PBackendService_Stub> stub(
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr));
Expand All @@ -277,7 +275,7 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr<Quer
auto closure = AutoReleaseClosure<PSyncFilterSizeRequest,
DummyBrpcCallback<PSyncFilterSizeResponse>>::
create_unique(sync_request,
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared(), ctx);
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared());
Comment on lines 275 to +278
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call site previously passed a QueryContext weak_ptr (conditionally based on ignore_runtime_filter_error) into AutoReleaseClosure so failures could be escalated/canceled when the option is false. With the ctx parameter removed and the callback being DummyBrpcCallback, RPC failure/non-OK status() handling appears to be reduced to logging only, which can change query correctness/termination behavior. Recommendation (mandatory): either (1) restore ctx-based cancellation semantics in AutoReleaseClosure (capturing status/failure before invoking call()), or (2) replace DummyBrpcCallback with a runtime-filter-specific callback that performs the required cancel/disable/sub behavior under the same option gating.

Copilot uses AI. Check for mistakes.

auto* pquery_id = closure->request_->mutable_query_id();
pquery_id->set_hi(query_ctx->query_id().hi);
Expand Down Expand Up @@ -376,17 +374,13 @@ Status RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryContext> q
}

if (is_ready) {
return _send_rf_to_target(cnt_val,
query_ctx->ignore_runtime_filter_error()
? std::weak_ptr<QueryContext> {}
: query_ctx,
merge_time, request->query_id(), query_ctx->execution_timeout());
return _send_rf_to_target(cnt_val, merge_time, request->query_id(),
query_ctx->execution_timeout());
}
return Status::OK();
}

Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext& cnt_val,
std::weak_ptr<QueryContext> ctx,
int64_t merge_time,
PUniqueId query_id,
int execution_timeout) {
Expand Down Expand Up @@ -429,7 +423,7 @@ Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext
auto closure = AutoReleaseClosure<PPublishFilterRequestV2,
DummyBrpcCallback<PPublishFilterResponse>>::
create_unique(std::make_shared<PPublishFilterRequestV2>(apply_request),
DummyBrpcCallback<PPublishFilterResponse>::create_shared(), ctx);
DummyBrpcCallback<PPublishFilterResponse>::create_shared());

closure->request_->set_merge_time(merge_time);
*closure->request_->mutable_query_id() = query_id;
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/runtime_filter/runtime_filter_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ class RuntimeFilterMergeControllerEntity {
const std::vector<TRuntimeFilterTargetParamsV2>&& target_info,
const int producer_size);

Status _send_rf_to_target(GlobalMergeContext& cnt_val, std::weak_ptr<QueryContext> ctx,
int64_t merge_time, PUniqueId query_id, int execution_timeout);
Status _send_rf_to_target(GlobalMergeContext& cnt_val, int64_t merge_time, PUniqueId query_id,
int execution_timeout);

// protect _filter_map
std::shared_mutex _filter_map_mutex;
Expand Down
77 changes: 35 additions & 42 deletions be/src/exec/runtime_filter/runtime_filter_producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,51 +98,43 @@ Status RuntimeFilterProducer::publish(RuntimeState* state, bool build_hash_table
return Status::OK();
}

class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest,
DummyBrpcCallback<PSendFilterSizeResponse>> {
std::shared_ptr<Dependency> _dependency;
// Should use weak ptr here, because when query context deconstructs, should also delete runtime filter
// context, it not the memory is not released. And rpc is in another thread, it will hold rf context
// after query context because the rpc is not returned.
std::weak_ptr<RuntimeFilterWrapper> _wrapper;
using Base =
AutoReleaseClosure<PSendFilterSizeRequest, DummyBrpcCallback<PSendFilterSizeResponse>>;
friend class RuntimeFilterProducer;
ENABLE_FACTORY_CREATOR(SyncSizeClosure);
// Callback for sync-size RPCs. Handles errors (disable wrapper + sub dependency) in call().
class SyncSizeCallback : public DummyBrpcCallback<PSendFilterSizeResponse> {
ENABLE_FACTORY_CREATOR(SyncSizeCallback);

void _process_if_rpc_failed() override {
Defer defer {[&]() {
Base::_process_if_rpc_failed();
public:
SyncSizeCallback(std::shared_ptr<Dependency> dependency,
std::shared_ptr<RuntimeFilterWrapper> wrapper)
: _dependency(std::move(dependency)), _wrapper(wrapper) {}

void call() override {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Behavioral regression] The old SyncSizeClosure called Base::_process_if_rpc_failed() in its Defer, which would invoke ctx->cancel(Status::NetworkError("RPC meet failed: ...")) when ignore_runtime_filter_error == false. This cancelled the query on RF RPC failure.

The new SyncSizeCallback::call() gracefully degrades (disables the filter + subs dependency) but never cancels the query context, regardless of ignore_runtime_filter_error.

This breaks the existing regression test regression-test/suites/query_p0/join/test_inject_send_filter_size_fail/test_inject_send_filter_size_fail.groovy (lines 80-86), which expects:

sql "set ignore_runtime_filter_error = false"
test {
    sql "...";
    exception "RPC meet failed"
}

The test expects the query to fail with "RPC meet failed" when the debug point injects an RPC failure and ignore_runtime_filter_error=false. With this PR, the query will succeed (with degraded performance) instead.

Recommendation: Either:

  1. Update the regression test to reflect the new behavior (query succeeds even on RF RPC failure), OR
  2. Add query-context cancellation logic to SyncSizeCallback::call() when ignore_runtime_filter_error == false (requires passing the flag or a weak QueryContext to the callback).

Option 1 seems more consistent with the PR's design philosophy of graceful degradation.

// On error: disable the wrapper and sub the dependency here, because set_synced_size()
// will never be called (the merge node won't respond with a sync).
// On success: do NOT sub here. The merge node will respond with sync_filter_size,
// which calls set_synced_size() -> _dependency->sub().
if (cntl_->Failed()) {
if (auto w = _wrapper.lock()) {
w->set_state(RuntimeFilterWrapper::State::DISABLED, cntl_->ErrorText());
}
((CountedFinishDependency*)_dependency.get())->sub();
}};
auto wrapper = _wrapper.lock();
if (!wrapper) {
return;
}

wrapper->set_state(RuntimeFilterWrapper::State::DISABLED, cntl_->ErrorText());
}

void _process_if_meet_error_status(const Status& status) override {
Defer defer {[&]() {
Base::_process_if_meet_error_status(status);
Status status = Status::create(response_->status());
if (!status.ok()) {
if (auto w = _wrapper.lock()) {
w->set_state(RuntimeFilterWrapper::State::DISABLED, status.to_string());
}
((CountedFinishDependency*)_dependency.get())->sub();
}};
auto wrapper = _wrapper.lock();
if (!wrapper) {
return;
}

wrapper->set_state(RuntimeFilterWrapper::State::DISABLED, status.to_string());
}

public:
SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req,
std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback,
std::shared_ptr<Dependency> dependency,
std::shared_ptr<RuntimeFilterWrapper> wrapper,
std::weak_ptr<QueryContext> context)
: Base(req, callback, context), _dependency(std::move(dependency)), _wrapper(wrapper) {}
private:
std::shared_ptr<Dependency> _dependency;
// Should use weak ptr here, because when query context deconstructs, should also delete runtime filter
// context, it not the memory is not released. And rpc is in another thread, it will hold rf context
// after query context because the rpc is not returned.
std::weak_ptr<RuntimeFilterWrapper> _wrapper;
};

void RuntimeFilterProducer::latch_dependency(
Expand Down Expand Up @@ -208,14 +200,15 @@ Status RuntimeFilterProducer::send_size(RuntimeState* state, uint64_t local_filt

auto request = std::make_shared<PSendFilterSizeRequest>();
request->set_stage(_stage);

auto callback = DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
auto callback = SyncSizeCallback::create_shared(_dependency, _wrapper);
// Store callback in the producer to keep it alive until the RPC completes.
// AutoReleaseClosure holds callbacks via weak_ptr, so without this the callback
// would be destroyed when this function returns and error-path sub() would never fire.
_sync_size_callback = callback;
// RuntimeFilter maybe deconstructed before the rpc finished, so that could not use
// a raw pointer in closure. Has to use the context's shared ptr.
auto closure = SyncSizeClosure::create_unique(request, callback, _dependency, _wrapper,
state->query_options().ignore_runtime_filter_error
? std::weak_ptr<QueryContext> {}
: state->get_query_ctx_weak());
auto closure = AutoReleaseClosure<PSendFilterSizeRequest, SyncSizeCallback>::create_unique(
request, callback);
Comment on lines +203 to +211
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_sync_size_callback is a single slot, but send_size() can be invoked more than once during the producer lifecycle in some implementations; if a second call happens before the first RPC completes, this assignment overwrites the stored callback and can still allow the earlier callback to be destroyed early (reintroducing the 'weak_ptr callback dies before completion' failure). Also, the stored callback is never cleared on completion, which can unnecessarily retain _dependency until producer teardown. Recommendation (mandatory): make the storage support multiple in-flight RPCs (e.g., vector/list keyed by RPC generation or request id), and clear the stored entry when the RPC completes (both success and error paths), e.g., by having the callback clear a weak back-reference to the producer or by clearing in the completion handler that observes the response.

Copilot uses AI. Check for mistakes.
auto* pquery_id = request->mutable_query_id();
pquery_id->set_hi(state->get_query_ctx()->query_id().hi);
pquery_id->set_lo(state->get_query_ctx()->query_id().lo);
Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/runtime_filter/runtime_filter_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ class RuntimeFilterProducer : public RuntimeFilter {

int64_t _synced_size = -1;
std::shared_ptr<CountedFinishDependency> _dependency;
// Holds the SyncSizeCallback alive until the send_filter_size RPC completes.
// AutoReleaseClosure stores callbacks via weak_ptr, so without this the callback
// would be destroyed when send_size() returns, and error-path sub() would never fire.
// Type-erased because the callback type is defined in the .cpp file.
std::shared_ptr<void> _sync_size_callback;

std::atomic<State> _rf_state;
};
Expand Down
64 changes: 20 additions & 44 deletions be/src/util/brpc_closure.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@

#include <google/protobuf/stubs/common.h>

#include <atomic>
#include <utility>

#include "runtime/query_context.h"
#include "runtime/thread_context.h"
#include "service/brpc.h"

Expand Down Expand Up @@ -84,9 +82,8 @@ class AutoReleaseClosure : public google::protobuf::Closure {
ENABLE_FACTORY_CREATOR(AutoReleaseClosure);

public:
AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback> callback,
std::weak_ptr<QueryContext> context = {}, std::string_view error_msg = {})
: request_(req), callback_(callback), context_(std::move(context)) {
AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback> callback)
: request_(req), callback_(callback) {
this->cntl_ = callback->cntl_;
this->response_ = callback->response_;
}
Expand All @@ -96,15 +93,15 @@ class AutoReleaseClosure : public google::protobuf::Closure {
// Will delete itself
void Run() override {
Defer defer {[&]() { delete this; }};
// If lock failed, it means the callback object is deconstructed, then no need
// to deal with the callback any more.
if (auto tmp = callback_.lock()) {
tmp->call();
}
// shouldn't do heavy work here. all heavy work should be done in callback's call() (which means in success/failure handlers)
if (cntl_->Failed()) {
_process_if_rpc_failed();
LOG(WARNING) << "brpc failed: " << cntl_->ErrorText();
} else {
_process_status<ResponseType>(response_.get());
_log_error_status<ResponseType>(response_.get());
}
// this must be the LAST operation in this function, because call() may reuse the callback! (response_ is in callback_)
if (auto tmp = callback_.lock()) {
tmp->call();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Observation - looks good] The reordering is correct: log/check state first, then call() last. The comment clearly documents why call() must be last. This is the core fix and it correctly addresses the data race.

}
}

Expand All @@ -116,45 +113,24 @@ class AutoReleaseClosure : public google::protobuf::Closure {
// at any stage.
std::shared_ptr<Request> request_;
std::shared_ptr<ResponseType> response_;
std::string error_msg_;

protected:
virtual void _process_if_rpc_failed() {
std::string error_msg =
fmt::format("RPC meet failed: {} {}", cntl_->ErrorText(), error_msg_);
if (auto ctx = context_.lock(); ctx) {
ctx->cancel(Status::NetworkError(error_msg));
} else {
LOG(WARNING) << error_msg;
}
}

virtual void _process_if_meet_error_status(const Status& status) {
if (status.is<ErrorCode::END_OF_FILE>()) {
// no need to log END_OF_FILE, reduce the unlessful log
return;
}
if (auto ctx = context_.lock(); ctx) {
ctx->cancel(status);
} else {
LOG(WARNING) << "RPC meet error status: " << status;
}
}

private:
template <typename Response>
void _process_status(Response* response) {}

template <HasStatus Response>
void _process_status(Response* response) {
void _log_error_status(Response* response) {
if (Status status = Status::create(response->status()); !status.ok()) {
_process_if_meet_error_status(status);
if (!status.is<ErrorCode::END_OF_FILE>()) {
LOG(WARNING) << "RPC meet error status: " << status;
}
}
}
// Use a weak ptr to keep the callback, so that the callback can be deleted if the main
// thread is freed.

template <typename Response>
requires(!HasStatus<Response>)
void _log_error_status(Response* /*response*/) {
// Response type has no status() method, nothing to log.
}
// Use a weak ptr to keep the callback, so that the callback can be deleted if the main thread is freed.
Weak callback_;
std::weak_ptr<QueryContext> context_;
};

} // namespace doris
Loading
Loading