diff --git a/be/src/exec/exchange/vdata_stream_sender.h b/be/src/exec/exchange/vdata_stream_sender.h index 29b6f400235ebd..9636c752d3d16c 100644 --- a/be/src/exec/exchange/vdata_stream_sender.h +++ b/be/src/exec/exchange/vdata_stream_sender.h @@ -164,6 +164,7 @@ class Channel { std::shared_ptr> get_send_callback(RpcInstance* ins, bool eos) { + // here we reuse the callback because it's re-construction may be expensive due to many parameters' capture if (!_send_callback) { _send_callback = ExchangeSendCallback::create_shared(); } else { diff --git a/be/src/exec/operator/exchange_sink_buffer.cpp b/be/src/exec/operator/exchange_sink_buffer.cpp index 22a316c9196df0..afb4abd0a08ece 100644 --- a/be/src/exec/operator/exchange_sink_buffer.cpp +++ b/be/src/exec/operator/exchange_sink_buffer.cpp @@ -347,6 +347,7 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { } // The eos here only indicates that the current exchange sink has reached eos. // However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent. + // `_send_rpc` must be the LAST operation in this function, because it may reuse the callback! s = _send_rpc(ins); if (!s) { _failed(ins.id, @@ -473,9 +474,9 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { } else if (eos) { _ended(ins); } - // The eos here only indicates that the current exchange sink has reached eos. // However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent. + // `_send_rpc` must be the LAST operation in this function, because it may reuse the callback! s = _send_rpc(ins); if (!s) { _failed(ins.id, diff --git a/be/src/exec/runtime_filter/runtime_filter.cpp b/be/src/exec/runtime_filter/runtime_filter.cpp index 8ee920d3c3357b..b5f90dbaa6158c 100644 --- a/be/src/exec/runtime_filter/runtime_filter.cpp +++ b/be/src/exec/runtime_filter/runtime_filter.cpp @@ -39,10 +39,7 @@ Status RuntimeFilter::_push_to_remote(RuntimeState* state, const TNetworkAddress auto merge_filter_callback = DummyBrpcCallback::create_shared(); auto merge_filter_closure = AutoReleaseClosure>:: - create_unique(merge_filter_request, merge_filter_callback, - state->query_options().ignore_runtime_filter_error - ? std::weak_ptr {} - : state->get_query_ctx_weak()); + create_unique(merge_filter_request, merge_filter_callback); void* data = nullptr; int len = 0; diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp index e0695cb2440686..7984568477edca 100644 --- a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp +++ b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp @@ -259,8 +259,6 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptradd_rf_size(request->filter_size())) { - auto ctx = query_ctx->ignore_runtime_filter_error() ? std::weak_ptr {} - : query_ctx; for (auto addr : cnt_val.source_addrs) { std::shared_ptr stub( ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr)); @@ -277,7 +275,7 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr>:: create_unique(sync_request, - DummyBrpcCallback::create_shared(), ctx); + DummyBrpcCallback::create_shared()); auto* pquery_id = closure->request_->mutable_query_id(); pquery_id->set_hi(query_ctx->query_id().hi); @@ -376,17 +374,13 @@ Status RuntimeFilterMergeControllerEntity::merge(std::shared_ptr q } if (is_ready) { - return _send_rf_to_target(cnt_val, - query_ctx->ignore_runtime_filter_error() - ? std::weak_ptr {} - : query_ctx, - merge_time, request->query_id(), query_ctx->execution_timeout()); + return _send_rf_to_target(cnt_val, merge_time, request->query_id(), + query_ctx->execution_timeout()); } return Status::OK(); } Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext& cnt_val, - std::weak_ptr ctx, int64_t merge_time, PUniqueId query_id, int execution_timeout) { @@ -429,7 +423,7 @@ Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext auto closure = AutoReleaseClosure>:: create_unique(std::make_shared(apply_request), - DummyBrpcCallback::create_shared(), ctx); + DummyBrpcCallback::create_shared()); closure->request_->set_merge_time(merge_time); *closure->request_->mutable_query_id() = query_id; diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.h b/be/src/exec/runtime_filter/runtime_filter_mgr.h index 9959cb567a4171..e0d85a0c24ace8 100644 --- a/be/src/exec/runtime_filter/runtime_filter_mgr.h +++ b/be/src/exec/runtime_filter/runtime_filter_mgr.h @@ -182,8 +182,8 @@ class RuntimeFilterMergeControllerEntity { const std::vector&& target_info, const int producer_size); - Status _send_rf_to_target(GlobalMergeContext& cnt_val, std::weak_ptr ctx, - int64_t merge_time, PUniqueId query_id, int execution_timeout); + Status _send_rf_to_target(GlobalMergeContext& cnt_val, int64_t merge_time, PUniqueId query_id, + int execution_timeout); // protect _filter_map std::shared_mutex _filter_map_mutex; diff --git a/be/src/exec/runtime_filter/runtime_filter_producer.cpp b/be/src/exec/runtime_filter/runtime_filter_producer.cpp index 4fbd170428802f..2fe534f038ac2f 100644 --- a/be/src/exec/runtime_filter/runtime_filter_producer.cpp +++ b/be/src/exec/runtime_filter/runtime_filter_producer.cpp @@ -98,53 +98,6 @@ Status RuntimeFilterProducer::publish(RuntimeState* state, bool build_hash_table return Status::OK(); } -class SyncSizeClosure : public AutoReleaseClosure> { - std::shared_ptr _dependency; - // Should use weak ptr here, because when query context deconstructs, should also delete runtime filter - // context, it not the memory is not released. And rpc is in another thread, it will hold rf context - // after query context because the rpc is not returned. - std::weak_ptr _wrapper; - using Base = - AutoReleaseClosure>; - friend class RuntimeFilterProducer; - ENABLE_FACTORY_CREATOR(SyncSizeClosure); - - void _process_if_rpc_failed() override { - Defer defer {[&]() { - Base::_process_if_rpc_failed(); - ((CountedFinishDependency*)_dependency.get())->sub(); - }}; - auto wrapper = _wrapper.lock(); - if (!wrapper) { - return; - } - - wrapper->set_state(RuntimeFilterWrapper::State::DISABLED, cntl_->ErrorText()); - } - - void _process_if_meet_error_status(const Status& status) override { - Defer defer {[&]() { - Base::_process_if_meet_error_status(status); - ((CountedFinishDependency*)_dependency.get())->sub(); - }}; - auto wrapper = _wrapper.lock(); - if (!wrapper) { - return; - } - - wrapper->set_state(RuntimeFilterWrapper::State::DISABLED, status.to_string()); - } - -public: - SyncSizeClosure(std::shared_ptr req, - std::shared_ptr> callback, - std::shared_ptr dependency, - std::shared_ptr wrapper, - std::weak_ptr context) - : Base(req, callback, context), _dependency(std::move(dependency)), _wrapper(wrapper) {} -}; - void RuntimeFilterProducer::latch_dependency( const std::shared_ptr& dependency) { std::unique_lock l(_rmtx); @@ -208,14 +161,16 @@ Status RuntimeFilterProducer::send_size(RuntimeState* state, uint64_t local_filt auto request = std::make_shared(); request->set_stage(_stage); - - auto callback = DummyBrpcCallback::create_shared(); + auto callback = SyncSizeCallback::create_shared(_dependency, _wrapper, + state->get_query_ctx()->weak_from_this()); + // Store callback in the producer to keep it alive until the RPC completes. + // AutoReleaseClosure holds callbacks via weak_ptr, so without this the callback + // would be destroyed when this function returns and error-path sub() would never fire. + _sync_size_callback = callback; // RuntimeFilter maybe deconstructed before the rpc finished, so that could not use // a raw pointer in closure. Has to use the context's shared ptr. - auto closure = SyncSizeClosure::create_unique(request, callback, _dependency, _wrapper, - state->query_options().ignore_runtime_filter_error - ? std::weak_ptr {} - : state->get_query_ctx_weak()); + auto closure = AutoReleaseClosure::create_unique( + request, callback); auto* pquery_id = request->mutable_query_id(); pquery_id->set_hi(state->get_query_ctx()->query_id().hi); pquery_id->set_lo(state->get_query_ctx()->query_id().lo); diff --git a/be/src/exec/runtime_filter/runtime_filter_producer.h b/be/src/exec/runtime_filter/runtime_filter_producer.h index 433396144a5209..682d97250b24bf 100644 --- a/be/src/exec/runtime_filter/runtime_filter_producer.h +++ b/be/src/exec/runtime_filter/runtime_filter_producer.h @@ -26,6 +26,58 @@ namespace doris { #include "common/compile_check_begin.h" + +// Callback for sync-size RPCs. Handles errors (disable wrapper + sub dependency) in call(). +class SyncSizeCallback : public DummyBrpcCallback { + ENABLE_FACTORY_CREATOR(SyncSizeCallback); + +public: + SyncSizeCallback(std::shared_ptr dependency, + std::shared_ptr wrapper, + std::weak_ptr context) + : _dependency(std::move(dependency)), _wrapper(wrapper), _context(std::move(context)) {} + + void call() override { + // On error: disable the wrapper and sub the dependency here, because set_synced_size() + // will never be called (the merge node won't respond with a sync). + // On success: do NOT sub here. The merge node will respond with sync_filter_size, + // which calls set_synced_size() -> _dependency->sub(). + if (cntl_->Failed()) { + if (auto w = _wrapper.lock()) { + w->set_state(RuntimeFilterWrapper::State::DISABLED, cntl_->ErrorText()); + } + if (auto ctx = _context.lock()) { + if (!ctx->ignore_runtime_filter_error()) { + ctx->cancel(Status::NetworkError("RPC meet failed: {}", cntl_->ErrorText())); + } + } + ((CountedFinishDependency*)_dependency.get())->sub(); + return; + } + + Status status = Status::create(response_->status()); + if (!status.ok()) { + if (auto w = _wrapper.lock()) { + w->set_state(RuntimeFilterWrapper::State::DISABLED, status.to_string()); + } + if (auto ctx = _context.lock()) { + if (!ctx->ignore_runtime_filter_error()) { + ctx->cancel(Status::NetworkError("RPC meet failed: {}", status.to_string())); + } + } + ((CountedFinishDependency*)_dependency.get())->sub(); + } + } + +private: + std::shared_ptr _dependency; + // Should use weak ptr here, because when query context deconstructs, should also delete runtime filter + // context, it not the memory is not released. And rpc is in another thread, it will hold rf context + // after query context because the rpc is not returned. + std::weak_ptr _wrapper; + std::weak_ptr _context; +}; + // Work on (hash/corss) join build sink node, RuntimeFilterProducerHelper will manage all RuntimeFilterProducer // Used to generate specific predicate and publish it to consumer/merger /** @@ -181,6 +233,10 @@ class RuntimeFilterProducer : public RuntimeFilter { int64_t _synced_size = -1; std::shared_ptr _dependency; + // Holds the SyncSizeCallback alive until the send_filter_size RPC completes. + // AutoReleaseClosure stores callbacks via weak_ptr, so without this the callback + // would be destroyed when send_size() returns, and error-path sub() would never fire. + std::shared_ptr _sync_size_callback; std::atomic _rf_state; }; diff --git a/be/src/util/brpc_closure.h b/be/src/util/brpc_closure.h index e981b185fc0015..a261125501b999 100644 --- a/be/src/util/brpc_closure.h +++ b/be/src/util/brpc_closure.h @@ -19,10 +19,8 @@ #include -#include #include -#include "runtime/query_context.h" #include "runtime/thread_context.h" #include "service/brpc.h" @@ -84,9 +82,8 @@ class AutoReleaseClosure : public google::protobuf::Closure { ENABLE_FACTORY_CREATOR(AutoReleaseClosure); public: - AutoReleaseClosure(std::shared_ptr req, std::shared_ptr callback, - std::weak_ptr context = {}, std::string_view error_msg = {}) - : request_(req), callback_(callback), context_(std::move(context)) { + AutoReleaseClosure(std::shared_ptr req, std::shared_ptr callback) + : request_(req), callback_(callback) { this->cntl_ = callback->cntl_; this->response_ = callback->response_; } @@ -96,15 +93,15 @@ class AutoReleaseClosure : public google::protobuf::Closure { // Will delete itself void Run() override { Defer defer {[&]() { delete this; }}; - // If lock failed, it means the callback object is deconstructed, then no need - // to deal with the callback any more. - if (auto tmp = callback_.lock()) { - tmp->call(); - } + // shouldn't do heavy work here. all heavy work should be done in callback's call() (which means in success/failure handlers) if (cntl_->Failed()) { - _process_if_rpc_failed(); + LOG(WARNING) << "brpc failed: " << cntl_->ErrorText(); } else { - _process_status(response_.get()); + _log_error_status(response_.get()); + } + // this must be the LAST operation in this function, because call() may reuse the callback! (response_ is in callback_) + if (auto tmp = callback_.lock()) { + tmp->call(); } } @@ -116,45 +113,24 @@ class AutoReleaseClosure : public google::protobuf::Closure { // at any stage. std::shared_ptr request_; std::shared_ptr response_; - std::string error_msg_; - -protected: - virtual void _process_if_rpc_failed() { - std::string error_msg = - fmt::format("RPC meet failed: {} {}", cntl_->ErrorText(), error_msg_); - if (auto ctx = context_.lock(); ctx) { - ctx->cancel(Status::NetworkError(error_msg)); - } else { - LOG(WARNING) << error_msg; - } - } - - virtual void _process_if_meet_error_status(const Status& status) { - if (status.is()) { - // no need to log END_OF_FILE, reduce the unlessful log - return; - } - if (auto ctx = context_.lock(); ctx) { - ctx->cancel(status); - } else { - LOG(WARNING) << "RPC meet error status: " << status; - } - } private: - template - void _process_status(Response* response) {} - template - void _process_status(Response* response) { + void _log_error_status(Response* response) { if (Status status = Status::create(response->status()); !status.ok()) { - _process_if_meet_error_status(status); + if (!status.is()) { + LOG(WARNING) << "RPC meet error status: " << status; + } } } - // Use a weak ptr to keep the callback, so that the callback can be deleted if the main - // thread is freed. + + template + requires(!HasStatus) + void _log_error_status(Response* /*response*/) { + // Response type has no status() method, nothing to log. + } + // Use a weak ptr to keep the callback, so that the callback can be deleted if the main thread is freed. Weak callback_; - std::weak_ptr context_; }; } // namespace doris diff --git a/be/test/exec/exchange/exchange_sink_test.cpp b/be/test/exec/exchange/exchange_sink_test.cpp index 7fa68c67003975..18701214ee107a 100644 --- a/be/test/exec/exchange/exchange_sink_test.cpp +++ b/be/test/exec/exchange/exchange_sink_test.cpp @@ -234,4 +234,138 @@ TEST_F(ExchangeSinkTest, test_queue_size) { } } +// Callback that records the state of response_ and cntl_ at the moment call() is invoked, +// then mutates them (simulating callback reuse triggering a new RPC). This lets us verify: +// 1. call() was invoked +// 2. The callback saw the correct original state (before any mutation) +// 3. After call(), the shared objects are mutated (so any code reading them after call() +// would see wrong values — this is the bug the fix prevents) +template +class StateCapturingCallback : public DummyBrpcCallback { + ENABLE_FACTORY_CREATOR(StateCapturingCallback); + +public: + StateCapturingCallback() = default; + + enum class MutateAction { + WRITE_ERROR, // Write an error status into response + CLEAR_STATUS, // Clear the status field + RESET_CNTL, // Call cntl_->Reset() + }; + + void set_mutate_action(MutateAction action) { _action = action; } + + void call() override { + call_invoked = true; + // Capture state BEFORE mutation — this is what call() sees. + cntl_failed_at_call_time = this->cntl_->Failed(); + if (this->cntl_->Failed()) { + cntl_error_at_call_time = this->cntl_->ErrorText(); + } + response_status_at_call_time = Status::create(this->response_->status()); + + // Now mutate (simulating callback reuse / new RPC) + switch (_action) { + case MutateAction::WRITE_ERROR: { + Status err = Status::InternalError("injected by callback reuse"); + err.to_protobuf(this->response_->mutable_status()); + break; + } + case MutateAction::CLEAR_STATUS: { + this->response_->mutable_status()->set_status_code(0); + this->response_->mutable_status()->clear_error_msgs(); + break; + } + case MutateAction::RESET_CNTL: { + this->cntl_->Reset(); + break; + } + } + } + + // Observable state + bool call_invoked = false; + bool cntl_failed_at_call_time = false; + std::string cntl_error_at_call_time; + Status response_status_at_call_time; + +private: + MutateAction _action = MutateAction::WRITE_ERROR; +}; + +using TestCallback = StateCapturingCallback; + +// Test: Response starts OK. call() writes an error into it. +// With correct ordering (log-before-call): the closure's logging sees OK (no warning), +// then call() runs and the callback captures the OK status at call time. +// With WRONG ordering (call-before-log): call() writes error first, then the closure +// would log the error — a false positive. We verify call() saw OK at invocation time, +// proving it ran after (or at least not before) the status was checked by the closure. +TEST_F(ExchangeSinkTest, test_closure_call_sees_original_ok_response) { + auto callback = TestCallback::create_shared(); + // Response starts OK (default). + callback->set_mutate_action(TestCallback::MutateAction::WRITE_ERROR); + + auto req = std::make_shared(); + auto* closure = new AutoReleaseClosure(req, callback); + + closure->Run(); // self-deletes + + EXPECT_TRUE(callback->call_invoked) << "call() should have been invoked"; + EXPECT_TRUE(callback->response_status_at_call_time.ok()) + << "call() must see the original OK response status. " + "If it saw an error, the ordering is wrong."; + EXPECT_FALSE(callback->cntl_failed_at_call_time); +} + +// Test: Response starts with an error. call() clears it. +// With correct ordering: closure logs the error first, then call() runs and sees the error +// at invocation time (before clearing it). +// With WRONG ordering: call() clears the error first, then the closure sees OK — error +// silently swallowed. We verify call() saw the error, proving the closure read it first. +TEST_F(ExchangeSinkTest, test_closure_call_sees_original_error_response) { + auto callback = TestCallback::create_shared(); + // Set error status on the response BEFORE Run(). + Status err = Status::InternalError("original RPC error"); + err.to_protobuf(callback->response_->mutable_status()); + callback->set_mutate_action(TestCallback::MutateAction::CLEAR_STATUS); + + auto req = std::make_shared(); + auto* closure = new AutoReleaseClosure(req, callback); + + closure->Run(); + + EXPECT_TRUE(callback->call_invoked) << "call() should have been invoked"; + EXPECT_FALSE(callback->response_status_at_call_time.ok()) + << "call() must see the original error status before clearing it. " + "If it saw OK, the ordering is wrong — call() cleared it before the closure " + "could log the error, silently swallowing it."; +} + +// Test: Controller is failed. call() resets it. +// With correct ordering: closure checks cntl_->Failed() first (true), logs the error, +// then call() runs and sees Failed() == true before resetting. +// With WRONG ordering: call() resets the controller first, then the closure sees +// Failed() == false — RPC failure silently lost. We verify call() saw the failure. +TEST_F(ExchangeSinkTest, test_closure_call_sees_original_rpc_failure) { + auto callback = TestCallback::create_shared(); + // Mark the controller as failed BEFORE Run(). + callback->cntl_->SetFailed("simulated network timeout"); + callback->set_mutate_action(TestCallback::MutateAction::RESET_CNTL); + + auto req = std::make_shared(); + auto* closure = new AutoReleaseClosure(req, callback); + + closure->Run(); + + EXPECT_TRUE(callback->call_invoked) << "call() should have been invoked"; + EXPECT_TRUE(callback->cntl_failed_at_call_time) + << "call() must see cntl_->Failed() == true before resetting the controller. " + "If it saw false, the ordering is wrong — call() reset the controller before " + "the closure could detect the failure."; + EXPECT_NE(callback->cntl_error_at_call_time.find("simulated network timeout"), + std::string::npos) + << "call() must see the original error text"; +} + } // namespace doris diff --git a/be/test/exec/runtime_filter/sync_size_callback_test.cpp b/be/test/exec/runtime_filter/sync_size_callback_test.cpp new file mode 100644 index 00000000000000..5fe6daf19f2528 --- /dev/null +++ b/be/test/exec/runtime_filter/sync_size_callback_test.cpp @@ -0,0 +1,192 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "exec/runtime_filter/runtime_filter_producer.h" +#include "exec/runtime_filter/runtime_filter_test_utils.h" + +namespace doris { + +class SyncSizeCallbackTest : public RuntimeFilterTest { +protected: + void SetUp() override { + RuntimeFilterTest::SetUp(); + _dependency = std::make_shared(0, 0, "TEST_DEP"); + _dependency->add(); + _wrapper = std::make_shared( + PrimitiveType::TYPE_INT, RuntimeFilterType::BLOOM_FILTER, /*filter_id=*/0, + RuntimeFilterWrapper::State::UNINITED); + } + + std::shared_ptr make_query_ctx(bool ignore_rf_error) { + auto opts = TQueryOptionsBuilder().build(); + opts.__set_ignore_runtime_filter_error(ignore_rf_error); + auto fe_address = TNetworkAddress(); + fe_address.hostname = LOCALHOST; + fe_address.port = DUMMY_PORT; + return QueryContext::create(TUniqueId(), ExecEnv::GetInstance(), opts, fe_address, true, + fe_address, QuerySource::INTERNAL_FRONTEND); + } + + std::shared_ptr make_callback( + std::weak_ptr ctx = {}, + std::shared_ptr dep = nullptr, + std::shared_ptr wrapper = nullptr) { + return SyncSizeCallback::create_shared(dep ? dep : _dependency, + wrapper ? wrapper : _wrapper, std::move(ctx)); + } + + std::shared_ptr _dependency; + std::shared_ptr _wrapper; +}; + +// ==================== cntl_->Failed() path ==================== + +TEST_F(SyncSizeCallbackTest, rpc_fail_cancels_query_when_ignore_rf_error_false) { + auto ctx = make_query_ctx(false); + auto callback = make_callback(ctx); + callback->cntl_->SetFailed("injected failure"); + + callback->call(); + + // Query should be cancelled + ASSERT_TRUE(ctx->is_cancelled()); + // Wrapper should be disabled + ASSERT_EQ(_wrapper->get_state(), RuntimeFilterWrapper::State::DISABLED); + // Dependency should have been subbed (counter back to 0 -> ready) + ASSERT_TRUE(_dependency->ready()); +} + +TEST_F(SyncSizeCallbackTest, rpc_fail_does_not_cancel_query_when_ignore_rf_error_true) { + auto ctx = make_query_ctx(true); + auto callback = make_callback(ctx); + callback->cntl_->SetFailed("injected failure"); + + callback->call(); + + // Query should NOT be cancelled + ASSERT_FALSE(ctx->is_cancelled()); + // Wrapper should still be disabled + ASSERT_EQ(_wrapper->get_state(), RuntimeFilterWrapper::State::DISABLED); + // Dependency should have been subbed + ASSERT_TRUE(_dependency->ready()); +} + +// ==================== response error status path ==================== + +TEST_F(SyncSizeCallbackTest, response_error_cancels_query_when_ignore_rf_error_false) { + auto ctx = make_query_ctx(false); + auto callback = make_callback(ctx); + // Set a non-OK status in the response + auto* status_pb = callback->response_->mutable_status(); + status_pb->set_status_code(TStatusCode::INTERNAL_ERROR); + status_pb->add_error_msgs("injected response error"); + + callback->call(); + + // Query should be cancelled + ASSERT_TRUE(ctx->is_cancelled()); + // Wrapper should be disabled + ASSERT_EQ(_wrapper->get_state(), RuntimeFilterWrapper::State::DISABLED); + // Dependency should have been subbed + ASSERT_TRUE(_dependency->ready()); +} + +TEST_F(SyncSizeCallbackTest, response_error_does_not_cancel_query_when_ignore_rf_error_true) { + auto ctx = make_query_ctx(true); + auto callback = make_callback(ctx); + auto* status_pb = callback->response_->mutable_status(); + status_pb->set_status_code(TStatusCode::INTERNAL_ERROR); + status_pb->add_error_msgs("injected response error"); + + callback->call(); + + // Query should NOT be cancelled + ASSERT_FALSE(ctx->is_cancelled()); + // Wrapper should still be disabled + ASSERT_EQ(_wrapper->get_state(), RuntimeFilterWrapper::State::DISABLED); + // Dependency should have been subbed + ASSERT_TRUE(_dependency->ready()); +} + +// ==================== success path ==================== + +TEST_F(SyncSizeCallbackTest, success_path_no_cancel_no_sub) { + auto ctx = make_query_ctx(false); + auto callback = make_callback(ctx); + // Default: cntl_ not failed, response status OK (status_code defaults to 0 = OK) + + callback->call(); + + // Query should NOT be cancelled + ASSERT_FALSE(ctx->is_cancelled()); + // Wrapper should NOT be disabled + ASSERT_NE(_wrapper->get_state(), RuntimeFilterWrapper::State::DISABLED); + // Dependency should NOT have been subbed (still blocked) + ASSERT_FALSE(_dependency->ready()); +} + +// ==================== expired weak_ptr paths ==================== + +TEST_F(SyncSizeCallbackTest, rpc_fail_with_expired_query_context_no_crash) { + // Create a temporary QueryContext that will be destroyed + std::weak_ptr expired_ctx; + { + auto fe_address = TNetworkAddress(); + fe_address.hostname = LOCALHOST; + fe_address.port = DUMMY_PORT; + auto tmp_ctx = + QueryContext::create(TUniqueId(), ExecEnv::GetInstance(), _query_options, + fe_address, true, fe_address, QuerySource::INTERNAL_FRONTEND); + expired_ctx = tmp_ctx; + } + // expired_ctx is now expired + + auto callback = make_callback(expired_ctx); + callback->cntl_->SetFailed("injected failure"); + + // Should not crash + callback->call(); + + // Wrapper should be disabled + ASSERT_EQ(_wrapper->get_state(), RuntimeFilterWrapper::State::DISABLED); + // Dependency should have been subbed + ASSERT_TRUE(_dependency->ready()); +} + +TEST_F(SyncSizeCallbackTest, rpc_fail_with_expired_wrapper_no_crash) { + auto ctx = make_query_ctx(false); + auto tmp_wrapper = std::make_shared( + PrimitiveType::TYPE_INT, RuntimeFilterType::BLOOM_FILTER, /*filter_id=*/1, + RuntimeFilterWrapper::State::UNINITED); + auto callback = make_callback(ctx, _dependency, tmp_wrapper); + // Destroy the wrapper before calling + tmp_wrapper.reset(); + + callback->cntl_->SetFailed("injected failure"); + + // Should not crash even though wrapper is expired + callback->call(); + + // Query should be cancelled (ignore_runtime_filter_error defaults to false) + ASSERT_TRUE(ctx->is_cancelled()); + // Dependency should have been subbed + ASSERT_TRUE(_dependency->ready()); +} + +} // namespace doris