From 22d5cd505667c1dce780eb028bf6ab139a83ddd6 Mon Sep 17 00:00:00 2001 From: Mryange Date: Thu, 7 May 2026 10:07:16 +0800 Subject: [PATCH 1/3] [refine](exec) add SubQueue abstraction and thread-safety annotations to DataQueue (#62947) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The DataQueue implementation used parallel vectors (one per child queue — lock, blocks, counters, etc.) scattered across the class, making it hard to reason about which lock protects which field. The old INJECT_MOCK_SLEEP pattern injected test randomness through a macro wrapping std::lock_guard, but this was fragile and didn't compose with Clang's -Wthread-safety static analysis. Several methods (e.g., set_source_block) performed lock-free checks followed by locked re-checks, risking subtle races. Root cause: per-child state was not encapsulated, lock/field relationships were implicit, and no static analysis guarded them. This PR: - Introduces a SubQueue struct that groups all per-child state (queue lock + blocks, free lock + free blocks, counters, sink dependency pointer) with explicit GUARDED_BY annotations. thread-safety macros, an AnnotatedMutex wrapper, and a LockGuard that replaces std::lock_guard. In BE_TEST builds, LockGuard injects random sleep before lock acquisition and after release to exercise concurrent code paths — replacing the old INJECT_MOCK_SLEEP macro. - Enables -Wthread-safety in Clang builds. - Moves dependency notifications (set_ready, block, set_always_ready) outside the queue lock in try_pop/try_push/clear_blocks to avoid nested lock ordering issues. - Fixes set_source_block to always hold _source_lock when reading _source_dependency, eliminating the lock-free pre-check. - Adds 20+ new unit tests covering SubQueue methods and DataQueue edge cases (empty pop, push-after-finished, capacity blocking, finish idempotency, clear_blocks, low-memory mode, terminate, free-block reuse, child_idx routing). (cherry picked from commit efd7067464e9880f98f79fe805e9f4d24381e9e4) --- be/CMakeLists.txt | 5 +- be/src/common/thread_safety_annotations.h | 120 +++++++++ be/src/exec/operator/data_queue.cpp | 243 ++++++++++------- be/src/exec/operator/data_queue.h | 111 ++++---- be/test/exec/pipeline/data_queue_test.cpp | 309 +++++++++++++++++++++- 5 files changed, 628 insertions(+), 160 deletions(-) create mode 100644 be/src/common/thread_safety_annotations.h diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index 1de3541d3eb449..d9f9c4bce82a4e 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -335,8 +335,9 @@ if (COMPILER_CLANG) -Wunused-template -Wunused-member-function -Wunused-macros - -Wconversion) - add_compile_options( -Wno-gnu-statement-expression + -Wconversion + -Wthread-safety) + add_compile_options(-Wno-gnu-statement-expression -Wno-implicit-float-conversion -Wno-sign-conversion ) diff --git a/be/src/common/thread_safety_annotations.h b/be/src/common/thread_safety_annotations.h new file mode 100644 index 00000000000000..41c50711db3e0c --- /dev/null +++ b/be/src/common/thread_safety_annotations.h @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Thread safety annotation macros and annotated mutex wrappers for +// Clang's -Wthread-safety static analysis. +// Reference: https://clang.llvm.org/docs/ThreadSafetyAnalysis.html + +#pragma once + +#include + +#ifdef BE_TEST +namespace doris { +void mock_random_sleep(); +} // namespace doris +#endif + +// Enable thread safety attributes only with clang. +// The attributes can be safely erased when compiling with other compilers. +#if defined(__clang__) && (!defined(SWIG)) +#define THREAD_ANNOTATION_ATTRIBUTE__(x) __attribute__((x)) +#else +#define THREAD_ANNOTATION_ATTRIBUTE__(x) // no-op +#endif + +#define CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(capability(x)) + +#define SCOPED_CAPABILITY THREAD_ANNOTATION_ATTRIBUTE__(scoped_lockable) + +#define GUARDED_BY(x) THREAD_ANNOTATION_ATTRIBUTE__(guarded_by(x)) + +#define PT_GUARDED_BY(x) THREAD_ANNOTATION_ATTRIBUTE__(pt_guarded_by(x)) + +#define ACQUIRED_BEFORE(...) THREAD_ANNOTATION_ATTRIBUTE__(acquired_before(__VA_ARGS__)) + +#define ACQUIRED_AFTER(...) THREAD_ANNOTATION_ATTRIBUTE__(acquired_after(__VA_ARGS__)) + +#define REQUIRES(...) THREAD_ANNOTATION_ATTRIBUTE__(requires_capability(__VA_ARGS__)) + +#define REQUIRES_SHARED(...) THREAD_ANNOTATION_ATTRIBUTE__(requires_shared_capability(__VA_ARGS__)) + +#define ACQUIRE(...) THREAD_ANNOTATION_ATTRIBUTE__(acquire_capability(__VA_ARGS__)) + +#define ACQUIRE_SHARED(...) THREAD_ANNOTATION_ATTRIBUTE__(acquire_shared_capability(__VA_ARGS__)) + +#define RELEASE(...) THREAD_ANNOTATION_ATTRIBUTE__(release_capability(__VA_ARGS__)) + +#define RELEASE_SHARED(...) THREAD_ANNOTATION_ATTRIBUTE__(release_shared_capability(__VA_ARGS__)) + +#define TRY_ACQUIRE(...) THREAD_ANNOTATION_ATTRIBUTE__(try_acquire_capability(__VA_ARGS__)) + +#define TRY_ACQUIRE_SHARED(...) \ + THREAD_ANNOTATION_ATTRIBUTE__(try_acquire_shared_capability(__VA_ARGS__)) + +#define EXCLUDES(...) THREAD_ANNOTATION_ATTRIBUTE__(locks_excluded(__VA_ARGS__)) + +#define ASSERT_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(assert_capability(x)) + +#define ASSERT_SHARED_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(assert_shared_capability(x)) + +#define RETURN_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(lock_returned(x)) + +#define NO_THREAD_SAFETY_ANALYSIS THREAD_ANNOTATION_ATTRIBUTE__(no_thread_safety_analysis) + +// Annotated mutex wrapper for use with Clang thread safety analysis. +// Wraps std::mutex and provides the CAPABILITY annotation so that +// GUARDED_BY / REQUIRES / etc. annotations can reference it. +class CAPABILITY("mutex") AnnotatedMutex { +public: + void lock() ACQUIRE() { _mutex.lock(); } + void unlock() RELEASE() { _mutex.unlock(); } + bool try_lock() TRY_ACQUIRE(true) { return _mutex.try_lock(); } + + // Access the underlying std::mutex (e.g., for std::condition_variable). + // Use with care — this bypasses thread safety annotations. + std::mutex& native_handle() { return _mutex; } + +private: + std::mutex _mutex; +}; + +// RAII scoped lock guard annotated for thread safety analysis. +// In BE_TEST builds, injects a random sleep before acquiring and after +// releasing the lock to exercise concurrent code paths. +template +class SCOPED_CAPABILITY LockGuard { +public: + explicit LockGuard(MutexType& mu) ACQUIRE(mu) : _mu(mu) { +#ifdef BE_TEST + doris::mock_random_sleep(); +#endif + _mu.lock(); + } + ~LockGuard() RELEASE() { + _mu.unlock(); +#ifdef BE_TEST + doris::mock_random_sleep(); +#endif + } + + LockGuard(const LockGuard&) = delete; + LockGuard& operator=(const LockGuard&) = delete; + +private: + MutexType& _mu; +}; diff --git a/be/src/exec/operator/data_queue.cpp b/be/src/exec/operator/data_queue.cpp index 84ba08f309f983..4147a5e6da37ff 100644 --- a/be/src/exec/operator/data_queue.cpp +++ b/be/src/exec/operator/data_queue.cpp @@ -20,43 +20,124 @@ #include #include -#include #include +#include "common/thread_safety_annotations.h" #include "core/block/block.h" #include "exec/pipeline/dependency.h" namespace doris { #include "common/compile_check_begin.h" -DataQueue::DataQueue(int child_count) - : _queue_blocks_lock(child_count), - _queue_blocks(child_count), - _free_blocks_lock(child_count), - _free_blocks(child_count), - _child_count(child_count), - _is_finished(child_count), - _is_canceled(child_count), - _cur_bytes_in_queue(child_count), - _cur_blocks_nums_in_queue(child_count), - _flag_queue_idx(0) { - for (int i = 0; i < child_count; ++i) { - _queue_blocks_lock[i].reset(new std::mutex()); - _free_blocks_lock[i].reset(new std::mutex()); - _is_finished[i] = false; - _is_canceled[i] = false; - _cur_bytes_in_queue[i] = 0; - _cur_blocks_nums_in_queue[i] = 0; + +void SubQueue::try_pop(std::unique_ptr* output_block) { + bool need_notify_sink_ready = false; + { + LockGuard l(queue_lock); + if (!blocks.empty()) { + *output_block = std::move(blocks.front()); + blocks.pop_front(); + bytes_in_queue -= (*output_block)->allocated_bytes(); + blocks_in_queue -= 1; + need_notify_sink_ready = blocks.empty(); + } + } + // Notify outside of queue_lock to avoid nested locks. + if (need_notify_sink_ready) { + sink_dependency->set_ready(); + } +} + +bool SubQueue::try_push(std::unique_ptr block, std::atomic_uint32_t& total_counter) { + bool need_block_sink = false; + { + LockGuard l(queue_lock); + if (is_finished) { + return false; + } + total_counter++; + bytes_in_queue += block->allocated_bytes(); + blocks.emplace_back(std::move(block)); + blocks_in_queue += 1; + need_block_sink = (static_cast(blocks.size()) > max_blocks_in_queue.load()); + } + // Notify outside of queue_lock to avoid nested locks. + if (need_block_sink) { + sink_dependency->block(); + } + return true; +} + +bool SubQueue::mark_finished(std::atomic_uint32_t& unfinished_counter, + std::atomic_bool& all_finished) { + LockGuard l(queue_lock); + if (is_finished) { + return false; + } + is_finished = true; + if (unfinished_counter.fetch_sub(1) == 1) { + all_finished = true; + } + return true; +} + +void SubQueue::clear_blocks() { + bool need_set_always_ready = false; + { + LockGuard l(queue_lock); + if (!blocks.empty()) { + blocks.clear(); + bytes_in_queue = 0; + blocks_in_queue = 0; + need_set_always_ready = true; + } + } + // Notify outside of queue_lock to keep lock ordering simple. + if (need_set_always_ready) { + sink_dependency->set_always_ready(); + } +} + +DataQueue::DataQueue(int child_count) : _sub_queues(child_count), _child_count(child_count) { + for (auto& sub : _sub_queues) { + sub = std::make_unique(); } _un_finished_counter = child_count; - _sink_dependencies.resize(child_count, nullptr); +} + +bool DataQueue::has_more_data() const { + return _cur_blocks_total_nums.load() > 0; +} + +void DataQueue::set_source_dependency(std::shared_ptr source_dependency) + NO_THREAD_SAFETY_ANALYSIS { + _source_dependency = std::move(source_dependency); +} + +void DataQueue::set_sink_dependency(Dependency* sink_dependency, int child_idx) { + _sub_queues[child_idx]->sink_dependency = sink_dependency; +} + +void DataQueue::set_max_blocks_in_sub_queue(int64_t max_blocks) { + for (auto& sub : _sub_queues) { + sub->max_blocks_in_queue = max_blocks; + } +} + +void DataQueue::set_low_memory_mode() { + _is_low_memory_mode = true; + for (auto& sub : _sub_queues) { + sub->max_blocks_in_queue = 1; + } + clear_free_blocks(); } std::unique_ptr DataQueue::get_free_block(int child_idx) { + auto& sub = *_sub_queues[child_idx]; { - INJECT_MOCK_SLEEP(std::lock_guard l(*_free_blocks_lock[child_idx])); - if (!_free_blocks[child_idx].empty()) { - auto block = std::move(_free_blocks[child_idx].front()); - _free_blocks[child_idx].pop_front(); + LockGuard l(sub.free_lock); + if (!sub.free_blocks.empty()) { + auto block = std::move(sub.free_blocks.front()); + sub.free_blocks.pop_front(); return block; } } @@ -68,29 +149,24 @@ void DataQueue::push_free_block(std::unique_ptr block, int child_idx) { DCHECK(block->rows() == 0); if (!_is_low_memory_mode) { - INJECT_MOCK_SLEEP(std::lock_guard l(*_free_blocks_lock[child_idx])); - _free_blocks[child_idx].emplace_back(std::move(block)); + auto& sub = *_sub_queues[child_idx]; + LockGuard l(sub.free_lock); + sub.free_blocks.emplace_back(std::move(block)); } } void DataQueue::clear_free_blocks() { - for (size_t child_idx = 0; child_idx < _free_blocks.size(); ++child_idx) { - std::lock_guard l(*_free_blocks_lock[child_idx]); + for (auto& sub : _sub_queues) { + LockGuard l(sub->free_lock); std::deque> tmp_queue; - _free_blocks[child_idx].swap(tmp_queue); + sub->free_blocks.swap(tmp_queue); } } void DataQueue::terminate() { - for (int i = 0; i < _queue_blocks.size(); i++) { + for (int i = 0; i < _child_count; ++i) { set_finish(i); - INJECT_MOCK_SLEEP(std::lock_guard l(*_queue_blocks_lock[i])); - if (_cur_blocks_nums_in_queue[i] > 0) { - _queue_blocks[i].clear(); - _cur_bytes_in_queue[i] = 0; - _cur_blocks_nums_in_queue[i] = 0; - _sink_dependencies[i]->set_always_ready(); - } + _sub_queues[i]->clear_blocks(); } clear_free_blocks(); } @@ -105,7 +181,7 @@ bool DataQueue::remaining_has_data() { if (_flag_queue_idx == _child_count) { _flag_queue_idx = 0; } - if (_cur_blocks_nums_in_queue[_flag_queue_idx] > 0) { + if (_sub_queues[_flag_queue_idx]->blocks_in_queue.load() > 0) { return true; } } @@ -115,28 +191,17 @@ bool DataQueue::remaining_has_data() { //the _flag_queue_idx indicate which queue has data, and in check can_read //will be set idx in remaining_has_data function Status DataQueue::get_block_from_queue(std::unique_ptr* output_block, int* child_idx) { - if (_is_canceled[_flag_queue_idx]) { - return Status::InternalError("Current queue of idx {} have beed canceled: ", - _flag_queue_idx); - } + const int idx = _flag_queue_idx; + auto& sub = *_sub_queues[idx]; - { - INJECT_MOCK_SLEEP(std::lock_guard l(*_queue_blocks_lock[_flag_queue_idx])); - if (_cur_blocks_nums_in_queue[_flag_queue_idx] > 0) { - *output_block = std::move(_queue_blocks[_flag_queue_idx].front()); - _queue_blocks[_flag_queue_idx].pop_front(); - if (child_idx) { - *child_idx = _flag_queue_idx; - } - _cur_bytes_in_queue[_flag_queue_idx] -= (*output_block)->allocated_bytes(); - _cur_blocks_nums_in_queue[_flag_queue_idx] -= 1; - if (_cur_blocks_nums_in_queue[_flag_queue_idx] == 0) { - _sink_dependencies[_flag_queue_idx]->set_ready(); - } - auto old_value = _cur_blocks_total_nums.fetch_sub(1); - if (old_value == 1 && _source_dependency) { - set_source_block(); - } + sub.try_pop(output_block); + if (*output_block) { + if (child_idx) { + *child_idx = idx; + } + auto old_total = _cur_blocks_total_nums.fetch_sub(1); + if (old_total == 1) { + set_source_block(); } } return Status::OK(); @@ -146,70 +211,44 @@ Status DataQueue::push_block(std::unique_ptr block, int child_idx) { if (!block) { return Status::OK(); } - { - INJECT_MOCK_SLEEP(std::lock_guard l(*_queue_blocks_lock[child_idx])); - if (_is_finished[child_idx]) { - return Status::EndOfFile("Already finish"); - } - _cur_bytes_in_queue[child_idx] += block->allocated_bytes(); - _queue_blocks[child_idx].emplace_back(std::move(block)); - _cur_blocks_nums_in_queue[child_idx] += 1; - - if (_cur_blocks_nums_in_queue[child_idx] > _max_blocks_in_sub_queue) { - _sink_dependencies[child_idx]->block(); - } - _cur_blocks_total_nums++; - - set_source_ready(); + auto& sub = *_sub_queues[child_idx]; + // total_counter is incremented inside try_push under queue_lock, only when the + // block is actually enqueued. This ensures get_block_from_queue() always observes + // _cur_blocks_total_nums >= 1 when it successfully pops a block, with no risk of + // underflow or the need for a rollback on failure. + if (!sub.try_push(std::move(block), _cur_blocks_total_nums)) { + return Status::EndOfFile("SubQueue already finished"); } + set_source_ready(); return Status::OK(); } void DataQueue::set_finish(int child_idx) { - INJECT_MOCK_SLEEP(std::lock_guard l(*_queue_blocks_lock[child_idx])); - if (_is_finished[child_idx]) { + auto& sub = *_sub_queues[child_idx]; + if (!sub.mark_finished(_un_finished_counter, _is_all_finished)) { return; } - _is_finished[child_idx] = true; - if (_un_finished_counter.fetch_sub(1) == 1) { - _is_all_finished = true; - } set_source_ready(); } -void DataQueue::set_canceled(int child_idx) { - INJECT_MOCK_SLEEP(std::lock_guard l(*_queue_blocks_lock[child_idx])); - DCHECK(!_is_finished[child_idx]); - _is_canceled[child_idx] = true; - _is_finished[child_idx] = true; - if (_un_finished_counter.fetch_sub(1) == 1) { - _is_all_finished = true; - } - set_source_ready(); -} - -bool DataQueue::is_finish(int child_idx) { - return _is_finished[child_idx]; -} - bool DataQueue::is_all_finish() { return _is_all_finished; } void DataQueue::set_source_ready() { + LockGuard lc(_source_lock); if (_source_dependency) { - std::unique_lock lc(_source_lock); _source_dependency->set_ready(); } } void DataQueue::set_source_block() { - if (_cur_blocks_total_nums == 0 && !is_all_finish()) { - std::unique_lock lc(_source_lock); - // Performing the judgment twice, attempting to avoid blocking the source as much as possible. - if (_cur_blocks_total_nums == 0 && !is_all_finish()) { - _source_dependency->block(); - } + // Re-check under _source_lock to avoid blocking the source when a concurrent push + // has already added new blocks (or all children have finished) since we observed + // the counter drop to zero. + LockGuard lc(_source_lock); + if (_source_dependency && _cur_blocks_total_nums == 0 && !is_all_finish()) { + _source_dependency->block(); } } diff --git a/be/src/exec/operator/data_queue.h b/be/src/exec/operator/data_queue.h index a3c34d86a1e90b..0def08c05c4639 100644 --- a/be/src/exec/operator/data_queue.h +++ b/be/src/exec/operator/data_queue.h @@ -20,10 +20,10 @@ #include #include #include -#include #include #include "common/status.h" +#include "common/thread_safety_annotations.h" #include "core/block/block.h" namespace doris { @@ -31,6 +31,52 @@ namespace doris { class Dependency; +// Per child sub-queue. Groups all parallel state so that the lock/field +// relationship is explicit and can be checked by clang -Wthread-safety. +struct SubQueue { + // Protects the `blocks` deque and serializes high-level state + // transitions (push/pop/finish/cancel) on this sub-queue. + AnnotatedMutex queue_lock; + std::deque> blocks GUARDED_BY(queue_lock); + + // The following fields are only accessed while holding queue_lock. + int64_t bytes_in_queue GUARDED_BY(queue_lock) = 0; + bool is_finished GUARDED_BY(queue_lock) = false; + + // Protects the `free_blocks` deque only. + AnnotatedMutex free_lock; + std::deque> free_blocks GUARDED_BY(free_lock); + + // blocks_in_queue is readable from lock-free fast paths (remaining_has_data), + // so it remains atomic and is intentionally not GUARDED_BY. + std::atomic_uint32_t blocks_in_queue {0}; + + // Maximum number of blocks allowed in this sub-queue before the sink is blocked. + // Updated by DataQueue::set_max_blocks_in_sub_queue / set_low_memory_mode. + std::atomic_int64_t max_blocks_in_queue {1}; + + // Set once during init via set_sink_dependency, then read-only. + Dependency* sink_dependency = nullptr; + + // Pop a block under queue_lock. + // Notifies sink_dependency->set_ready() (outside the lock) if the queue becomes empty. + // output_block is null if the queue was empty. + void try_pop(std::unique_ptr* output_block); + + // Push a block under queue_lock and atomically increment total_counter. + // Returns false (without incrementing) if already finished. + // Calls sink_dependency->block() (outside the lock) if the queue exceeds max_blocks_in_queue. + bool try_push(std::unique_ptr block, std::atomic_uint32_t& total_counter); + + // Mark this sub-queue finished. Returns false if already finished (idempotent). + // Decrements unfinished_counter and may set all_finished within queue_lock. + bool mark_finished(std::atomic_uint32_t& unfinished_counter, std::atomic_bool& all_finished); + + // Clear all pending blocks under queue_lock. + // Calls sink_dependency->set_always_ready() (outside the lock) if any blocks were cleared. + void clear_blocks(); +}; + class DataQueue { public: //always one is enough, but in union node it's has more children @@ -38,64 +84,37 @@ class DataQueue { ~DataQueue() = default; Status get_block_from_queue(std::unique_ptr* block, int* child_idx = nullptr); - Status push_block(std::unique_ptr block, int child_idx = 0); std::unique_ptr get_free_block(int child_idx = 0); - void push_free_block(std::unique_ptr output_block, int child_idx = 0); - void clear_free_blocks(); - void set_finish(int child_idx = 0); - void set_canceled(int child_idx = 0); // should set before finish - bool is_finish(int child_idx = 0); bool is_all_finish(); // This function is not thread safe, should be called in Operator::get_block() bool remaining_has_data(); + bool has_more_data() const; - bool has_more_data() const { return _cur_blocks_total_nums.load() > 0; } - - int64_t max_bytes_in_queue() const { return _max_bytes_in_queue; } - int64_t max_size_of_queue() const { return _max_size_of_queue; } - - void set_source_dependency(std::shared_ptr source_dependency) { - _source_dependency = source_dependency; - } - void set_sink_dependency(Dependency* sink_dependency, int child_idx) { - _sink_dependencies[child_idx] = sink_dependency; - } - - void set_source_ready(); - void set_source_block(); - - void set_max_blocks_in_sub_queue(int64_t max_blocks) { _max_blocks_in_sub_queue = max_blocks; } - - void set_low_memory_mode() { - _is_low_memory_mode = true; - _max_blocks_in_sub_queue = 1; - clear_free_blocks(); - } + void set_source_dependency(std::shared_ptr source_dependency) + NO_THREAD_SAFETY_ANALYSIS; + void set_sink_dependency(Dependency* sink_dependency, int child_idx); + void set_max_blocks_in_sub_queue(int64_t max_blocks); + void set_low_memory_mode(); void terminate(); private: - std::vector> _queue_blocks_lock; - std::vector>> _queue_blocks; + void clear_free_blocks(); + void set_source_ready(); + void set_source_block(); - std::vector> _free_blocks_lock; - std::vector>> _free_blocks; + std::vector> _sub_queues; //how many deque will be init, always will be one int _child_count = 0; - std::vector _is_finished; - std::atomic_uint32_t _un_finished_counter; + std::atomic_uint32_t _un_finished_counter = 0; std::atomic_bool _is_all_finished = false; - std::vector _is_canceled; - // int64_t just for counter of profile - std::vector _cur_bytes_in_queue; - std::vector _cur_blocks_nums_in_queue; std::atomic_uint32_t _cur_blocks_total_nums = 0; //this will be indicate which queue has data, it's useful when have many queues @@ -103,17 +122,11 @@ class DataQueue { // only used by streaming agg source operator std::atomic_bool _is_low_memory_mode = false; - std::atomic_int64_t _max_blocks_in_sub_queue = 1; - - //this only use to record the queue[0] for profile - int64_t _max_bytes_in_queue = 0; - int64_t _max_size_of_queue = 0; - static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024L * 1024 * 1024 / 10; - // data queue is multi sink one source - std::shared_ptr _source_dependency = nullptr; - std::vector _sink_dependencies; - std::mutex _source_lock; + // _source_dependency is written once during initialization (set_source_dependency) + // and read/used only while holding _source_lock thereafter. + std::shared_ptr _source_dependency GUARDED_BY(_source_lock) = nullptr; + AnnotatedMutex _source_lock; }; #include "common/compile_check_end.h" diff --git a/be/test/exec/pipeline/data_queue_test.cpp b/be/test/exec/pipeline/data_queue_test.cpp index 07ef77236325aa..f8ba26bc664d01 100644 --- a/be/test/exec/pipeline/data_queue_test.cpp +++ b/be/test/exec/pipeline/data_queue_test.cpp @@ -20,6 +20,7 @@ #include #include +#include #include #include "core/data_type/data_type_number.h" @@ -27,6 +28,173 @@ namespace doris { +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +static std::unique_ptr make_block(size_t rows = 1) { + auto block = std::make_unique(); + auto col = ColumnUInt8::create(rows); + block->insert(ColumnWithTypeAndName {std::move(col), std::make_shared(), ""}); + return block; +} + +// Create a Dependency that starts ready. +static std::shared_ptr make_dep(bool initially_ready = true) { + return Dependency::create_shared(1, 1, "Test", initially_ready); +} + +// --------------------------------------------------------------------------- +// SubQueue tests +// --------------------------------------------------------------------------- + +class SubQueueTest : public testing::Test { +public: + void SetUp() override { + dep = make_dep(/*initially_ready=*/true); + sub = std::make_unique(); + sub->sink_dependency = dep.get(); + sub->max_blocks_in_queue = 2; + } + + std::shared_ptr dep; + std::unique_ptr sub; + std::atomic_uint32_t counter_ {0}; +}; + +// Pop from an empty queue returns OK with null output. +TEST_F(SubQueueTest, TryPopEmpty) { + std::unique_ptr out; + sub->try_pop(&out); + EXPECT_EQ(out, nullptr); + EXPECT_EQ(sub->blocks_in_queue.load(), 0u); +} + +// Basic push then pop returns the block. +TEST_F(SubQueueTest, TryPushPop_Basic) { + EXPECT_TRUE(sub->try_push(make_block(), counter_)); + EXPECT_EQ(sub->blocks_in_queue.load(), 1u); + + std::unique_ptr out; + sub->try_pop(&out); + EXPECT_NE(out, nullptr); + EXPECT_EQ(sub->blocks_in_queue.load(), 0u); +} + +// push after mark_finished returns EndOfFile. +TEST_F(SubQueueTest, TryPushAfterFinished) { + std::atomic_uint32_t counter {1}; + std::atomic_bool all_done {false}; + sub->mark_finished(counter, all_done); + + EXPECT_FALSE(sub->try_push(make_block(), counter_)); +} + +// When blocks.size() exceeds max_blocks_in_queue, sink is blocked. +TEST_F(SubQueueTest, SinkBlockedWhenFull) { + sub->max_blocks_in_queue = 2; + dep->set_ready(); // start ready + + // Push up to the limit — sink should stay ready. + EXPECT_TRUE(sub->try_push(make_block(), counter_)); + EXPECT_TRUE(sub->try_push(make_block(), counter_)); + EXPECT_TRUE(dep->ready()); + + // Push one over the limit — sink should be blocked. + EXPECT_TRUE(sub->try_push(make_block(), counter_)); + EXPECT_FALSE(dep->ready()); +} + +// Sink wakes up only when the queue becomes completely empty. +TEST_F(SubQueueTest, SinkReadyWhenQueueEmpty) { + sub->max_blocks_in_queue = 2; + + // Fill to 3 (one over limit) → sink blocked. + EXPECT_TRUE(sub->try_push(make_block(), counter_)); + EXPECT_TRUE(sub->try_push(make_block(), counter_)); + EXPECT_TRUE(sub->try_push(make_block(), counter_)); + EXPECT_FALSE(dep->ready()); + + // Pop 1 & 2: queue not empty yet → sink still blocked. + std::unique_ptr out; + sub->try_pop(&out); + EXPECT_NE(out, nullptr); + EXPECT_FALSE(dep->ready()); + + sub->try_pop(&out); + EXPECT_NE(out, nullptr); + EXPECT_FALSE(dep->ready()); + + // Pop 3: queue empty → set_ready(). + sub->try_pop(&out); + EXPECT_NE(out, nullptr); + EXPECT_TRUE(dep->ready()); +} + +// mark_finished is idempotent: second call returns false and counter stays correct. +TEST_F(SubQueueTest, MarkFinishedIdempotent) { + std::atomic_uint32_t counter {2}; + std::atomic_bool all_done {false}; + + EXPECT_TRUE(sub->mark_finished(counter, all_done)); + EXPECT_EQ(counter.load(), 1u); + EXPECT_FALSE(all_done.load()); + + EXPECT_FALSE(sub->mark_finished(counter, all_done)); + EXPECT_EQ(counter.load(), 1u); // unchanged +} + +// mark_finished sets all_finished when last child finishes. +TEST_F(SubQueueTest, MarkFinishedSetsAllFinished) { + std::atomic_uint32_t counter {1}; + std::atomic_bool all_done {false}; + sub->mark_finished(counter, all_done); + EXPECT_TRUE(all_done.load()); +} + +// clear_blocks empties the queue and calls set_always_ready on sink. +TEST_F(SubQueueTest, ClearBlocksEmptiesQueue) { + EXPECT_TRUE(sub->try_push(make_block(), counter_)); + EXPECT_TRUE(sub->try_push(make_block(), counter_)); + EXPECT_EQ(sub->blocks_in_queue.load(), 2u); + + sub->clear_blocks(); + + EXPECT_EQ(sub->blocks_in_queue.load(), 0u); + // set_always_ready was called → dep is always ready. + EXPECT_TRUE(dep->ready()); +} + +// clear_blocks on an empty queue is a no-op (set_always_ready not called). +TEST_F(SubQueueTest, ClearBlocksNoop) { + dep->block(); // start blocked + sub->clear_blocks(); + EXPECT_FALSE(dep->ready()); // still blocked — clear_blocks did nothing +} + +// bytes_in_queue tracks push/pop correctly. +TEST_F(SubQueueTest, BytesInQueueTracking) { + auto block = make_block(10); + int64_t expected_bytes = block->allocated_bytes(); + + EXPECT_TRUE(sub->try_push(std::move(block), counter_)); + { + LockGuard l(sub->queue_lock); + EXPECT_EQ(sub->bytes_in_queue, expected_bytes); + } + + std::unique_ptr out; + sub->try_pop(&out); + { + LockGuard l(sub->queue_lock); + EXPECT_EQ(sub->bytes_in_queue, 0); + } +} + +// --------------------------------------------------------------------------- +// DataQueue fixtures +// --------------------------------------------------------------------------- + class DataQueueTest : public testing::Test { public: DataQueueTest() = default; @@ -47,6 +215,140 @@ class DataQueueTest : public testing::Test { const int child_count = 3; }; +// --------------------------------------------------------------------------- +// DataQueue unit tests +// --------------------------------------------------------------------------- + +// Initial state: no data, no finish. +TEST_F(DataQueueTest, InitialState) { + EXPECT_FALSE(data_queue->has_more_data()); + EXPECT_FALSE(data_queue->is_all_finish()); + EXPECT_FALSE(data_queue->remaining_has_data()); +} + +// Push one block and retrieve it. +TEST_F(DataQueueTest, SinglePushPop) { + EXPECT_TRUE(data_queue->push_block(make_block(), 0).ok()); + EXPECT_TRUE(data_queue->has_more_data()); + + // Find the queue with data. + EXPECT_TRUE(data_queue->remaining_has_data()); + + std::unique_ptr out; + int child_idx = -1; + EXPECT_TRUE(data_queue->get_block_from_queue(&out, &child_idx).ok()); + EXPECT_NE(out, nullptr); + EXPECT_EQ(child_idx, 0); + EXPECT_FALSE(data_queue->has_more_data()); +} + +// is_all_finish only becomes true after all children call set_finish. +TEST_F(DataQueueTest, IsAllFinishAfterAllChildren) { + data_queue->set_finish(0); + EXPECT_FALSE(data_queue->is_all_finish()); + data_queue->set_finish(1); + EXPECT_FALSE(data_queue->is_all_finish()); + data_queue->set_finish(2); + EXPECT_TRUE(data_queue->is_all_finish()); +} + +// set_finish is idempotent. +TEST_F(DataQueueTest, SetFinishIdempotent) { + data_queue->set_finish(0); + data_queue->set_finish(0); // second call must not double-decrement + data_queue->set_finish(1); + data_queue->set_finish(2); + EXPECT_TRUE(data_queue->is_all_finish()); +} + +// child_idx returned by get_block_from_queue reflects the actual queue. +TEST_F(DataQueueTest, ChildIdxReturned) { + // Push to child 1 only. + EXPECT_TRUE(data_queue->push_block(make_block(), 1).ok()); + data_queue->remaining_has_data(); // advance _flag_queue_idx to find child 1 + + std::unique_ptr out; + int child_idx = -1; + EXPECT_TRUE(data_queue->get_block_from_queue(&out, &child_idx).ok()); + EXPECT_NE(out, nullptr); + EXPECT_EQ(child_idx, 1); +} + +// get_free_block returns a new block when free list is empty, reuses when not. +TEST_F(DataQueueTest, FreeBlockReuse) { + // First call: allocates a new block. + auto block = data_queue->get_free_block(0); + EXPECT_NE(block, nullptr); + + // Return it to the free list. + block->clear(); // ensure rows == 0 + data_queue->push_free_block(std::move(block), 0); + + // Second call: must return the recycled block. + auto block2 = data_queue->get_free_block(0); + EXPECT_NE(block2, nullptr); +} + +// In low-memory mode push_free_block discards blocks and max drops to 1. +TEST_F(DataQueueTest, LowMemoryMode) { + // Pre-populate the free list. + data_queue->push_free_block(Block::create_unique(), 0); + + data_queue->set_low_memory_mode(); + + // Free list must be cleared. + auto block = data_queue->get_free_block(0); + // The free list is empty → a fresh block is returned (not from the list). + EXPECT_NE(block, nullptr); + + // push_free_block now discards. + block->clear(); + data_queue->push_free_block(std::move(block), 0); + auto block2 = data_queue->get_free_block(0); + // Still gets a fresh allocation (free list stays empty). + EXPECT_NE(block2, nullptr); +} + +// terminate() finishes all children and clears pending blocks from sub-queues. +TEST_F(DataQueueTest, Terminate) { + EXPECT_TRUE(data_queue->push_block(make_block(), 0).ok()); + EXPECT_TRUE(data_queue->push_block(make_block(), 1).ok()); + + data_queue->terminate(); + + EXPECT_TRUE(data_queue->is_all_finish()); + // remaining_has_data() checks blocks_in_queue per sub-queue, + // which clear_blocks() resets to 0. + EXPECT_FALSE(data_queue->remaining_has_data()); +} + +// set_max_blocks_in_sub_queue propagates to every sub-queue. +TEST_F(DataQueueTest, SetMaxBlocksInSubQueue) { + data_queue->set_max_blocks_in_sub_queue(5); + // Push 5 blocks to child 0 — sink must stay ready (not over the limit yet). + for (int i = 0; i < 5; i++) { + EXPECT_TRUE(data_queue->push_block(make_block(), 0).ok()); + } + EXPECT_TRUE(sink_deps[0]->ready()); + + // 6th push exceeds limit → sink blocked. + EXPECT_TRUE(data_queue->push_block(make_block(), 0).ok()); + EXPECT_FALSE(sink_deps[0]->ready()); +} + +// Source dependency is notified ready when a block is pushed. +TEST_F(DataQueueTest, SourceReadyOnPush) { + source_dep->block(); // start blocked + EXPECT_FALSE(source_dep->ready()); + + EXPECT_TRUE(data_queue->push_block(make_block(), 0).ok()); + EXPECT_TRUE(source_dep->ready()); +} + +// --------------------------------------------------------------------------- +// Multi-threaded integration test (existing) +// --------------------------------------------------------------------------- + TEST_F(DataQueueTest, MultiTest) { int output_count = 0; auto output_func = [&]() { @@ -107,14 +409,7 @@ TEST_F(DataQueueTest, MultiTest) { output1.join(); EXPECT_EQ(output_count, 150); - for (int i = 0; i < 3; i++) { - EXPECT_TRUE(data_queue->is_finish(i)); - } EXPECT_TRUE(data_queue->is_all_finish()); - data_queue->clear_free_blocks(); - for (int i = 0; i < 3; i++) { - EXPECT_TRUE(data_queue->_free_blocks[i].empty()); - } } // ./run-be-ut.sh --run --filter=DataQueueTest.* From 49f7a2591dd3f7765299afcaf06426d3b0ae7d49 Mon Sep 17 00:00:00 2001 From: Mryange Date: Thu, 7 May 2026 21:03:37 +0800 Subject: [PATCH 2/3] [fix](pipeline) avoid data queue sink dependency lost wakeup (#63055) ### What problem does this PR solve? Issue Number: N/A Problem Summary: `DataQueueTest.MultiTest` could intermittently hang after DataQueue moved sink dependency notifications outside the per-sub-queue lock. Root cause: `SubQueue` queue state and `sink_dependency` state were no longer serialized by `queue_lock`, so a producer could observe its sink dependency as blocked even after the queue had already become empty, leaving no future push/pop to wake it. This patch updates `sink_dependency->set_ready()` and `sink_dependency->block()` while holding `queue_lock`, keeping queue occupancy and sink readiness transitions atomic with respect to each other. Related PR: https://github.com/apache/doris/pull/62947 (cherry picked from commit 17bbba45a52e6e6ace99773930f83dc1569897f7) --- be/src/exec/operator/data_queue.cpp | 44 +++++++++++------------------ 1 file changed, 17 insertions(+), 27 deletions(-) diff --git a/be/src/exec/operator/data_queue.cpp b/be/src/exec/operator/data_queue.cpp index 4147a5e6da37ff..b41f342ba07a2f 100644 --- a/be/src/exec/operator/data_queue.cpp +++ b/be/src/exec/operator/data_queue.cpp @@ -30,38 +30,28 @@ namespace doris { #include "common/compile_check_begin.h" void SubQueue::try_pop(std::unique_ptr* output_block) { - bool need_notify_sink_ready = false; - { - LockGuard l(queue_lock); - if (!blocks.empty()) { - *output_block = std::move(blocks.front()); - blocks.pop_front(); - bytes_in_queue -= (*output_block)->allocated_bytes(); - blocks_in_queue -= 1; - need_notify_sink_ready = blocks.empty(); + LockGuard l(queue_lock); + if (!blocks.empty()) { + *output_block = std::move(blocks.front()); + blocks.pop_front(); + bytes_in_queue -= (*output_block)->allocated_bytes(); + blocks_in_queue -= 1; + if (blocks.empty()) { + sink_dependency->set_ready(); } } - // Notify outside of queue_lock to avoid nested locks. - if (need_notify_sink_ready) { - sink_dependency->set_ready(); - } } bool SubQueue::try_push(std::unique_ptr block, std::atomic_uint32_t& total_counter) { - bool need_block_sink = false; - { - LockGuard l(queue_lock); - if (is_finished) { - return false; - } - total_counter++; - bytes_in_queue += block->allocated_bytes(); - blocks.emplace_back(std::move(block)); - blocks_in_queue += 1; - need_block_sink = (static_cast(blocks.size()) > max_blocks_in_queue.load()); - } - // Notify outside of queue_lock to avoid nested locks. - if (need_block_sink) { + LockGuard l(queue_lock); + if (is_finished) { + return false; + } + total_counter++; + bytes_in_queue += block->allocated_bytes(); + blocks.emplace_back(std::move(block)); + blocks_in_queue += 1; + if (static_cast(blocks.size()) > max_blocks_in_queue.load()) { sink_dependency->block(); } return true; From f94e6c20f6a645bac3f62b6d6ead155dcfe91a51 Mon Sep 17 00:00:00 2001 From: Mryange Date: Sat, 9 May 2026 09:57:11 +0800 Subject: [PATCH 3/3] [refine](exec/operator) replace std::mutex/std::lock_guard with annotated wrappers for thread safety analysis (#63070) This PR introduces Clang thread safety annotations (-Wthread-safety) to pipeline operator shared states by replacing raw std::mutex/std::lock_guard/std::unique_lock with annotated wrappers (AnnotatedMutex, LockGuard, UniqueLock), and by decorating guarded member variables with GUARDED_BY attributes. This enables the compiler to statically detect data races where a field is accessed without holding its associated mutex. The change also fixes two bugs uncovered during the annotation process: - MultiCastDataStreamer::push: _eos (member) was checked instead of eos (parameter), causing the "set always ready" branch to fire on the prior call's stale state rather than the current one. - MultiCastDataStreamer::pull's spill lambda: _cached_blocks[sender_idx].empty() was checked outside the mutex; the check is now done via a boolean captured inside the lock. (cherry picked from commit 2dc0d0a8c81975d9c270ae72041f14c533b0c7fb) --- be/src/common/thread_safety_annotations.h | 51 +++++++++++++++++++ .../exec/operator/analytic_sink_operator.cpp | 4 +- .../operator/analytic_source_operator.cpp | 9 ++-- .../exec/operator/exchange_sink_operator.cpp | 3 +- be/src/exec/operator/exchange_sink_operator.h | 6 +-- be/src/exec/operator/hashjoin_build_sink.cpp | 8 +-- be/src/exec/operator/hashjoin_build_sink.h | 5 +- .../operator/multi_cast_data_streamer.cpp | 19 ++++--- .../exec/operator/multi_cast_data_streamer.h | 19 +++---- .../operator/partition_sort_sink_operator.cpp | 10 ++-- .../partition_sort_source_operator.cpp | 9 ++-- be/src/exec/operator/scan_operator.cpp | 4 +- be/src/exec/operator/scan_operator.h | 4 +- be/src/exec/pipeline/dependency.h | 19 +++---- .../multi_cast_data_streamer_test.cpp | 17 ++++--- 15 files changed, 122 insertions(+), 65 deletions(-) diff --git a/be/src/common/thread_safety_annotations.h b/be/src/common/thread_safety_annotations.h index 41c50711db3e0c..6cd8d4b0cae45c 100644 --- a/be/src/common/thread_safety_annotations.h +++ b/be/src/common/thread_safety_annotations.h @@ -118,3 +118,54 @@ class SCOPED_CAPABILITY LockGuard { private: MutexType& _mu; }; + +// RAII unique lock annotated for thread safety analysis. +// Supports manual lock/unlock while preserving capability tracking. +template +class SCOPED_CAPABILITY UniqueLock { +public: + explicit UniqueLock(MutexType& mu) ACQUIRE(mu) : _mu(&mu), _locked(true) { +#ifdef BE_TEST + doris::mock_random_sleep(); +#endif + _mu->lock(); + } + + UniqueLock(MutexType& mu, std::adopt_lock_t) REQUIRES(mu) : _mu(&mu), _locked(true) {} + + UniqueLock(MutexType& mu, std::defer_lock_t) EXCLUDES(mu) : _mu(&mu), _locked(false) {} + + ~UniqueLock() RELEASE() { + if (_locked) { + _mu->unlock(); +#ifdef BE_TEST + doris::mock_random_sleep(); +#endif + } + } + + void lock() ACQUIRE() { +#ifdef BE_TEST + doris::mock_random_sleep(); +#endif + _mu->lock(); + _locked = true; + } + + void unlock() RELEASE() { + _mu->unlock(); + _locked = false; +#ifdef BE_TEST + doris::mock_random_sleep(); +#endif + } + + bool owns_lock() const { return _locked; } + + UniqueLock(const UniqueLock&) = delete; + UniqueLock& operator=(const UniqueLock&) = delete; + +private: + MutexType* _mu; + bool _locked; +}; diff --git a/be/src/exec/operator/analytic_sink_operator.cpp b/be/src/exec/operator/analytic_sink_operator.cpp index db5ac2ecd37e15..7a6b0d659c0dcd 100644 --- a/be/src/exec/operator/analytic_sink_operator.cpp +++ b/be/src/exec/operator/analytic_sink_operator.cpp @@ -461,7 +461,7 @@ void AnalyticSinkLocalState::_init_result_columns() { void AnalyticSinkLocalState::_refresh_buffer_and_dependency_state(Block* block) { size_t buffer_size = 0; { - std::unique_lock lc(_shared_state->buffer_mutex); + LockGuard lc(_shared_state->buffer_mutex); _shared_state->blocks_buffer.push(std::move(*block)); buffer_size = _shared_state->blocks_buffer.size(); } @@ -756,7 +756,7 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, Block* input_bloc RETURN_IF_ERROR(_add_input_block(state, input_block)); RETURN_IF_ERROR(local_state._execute_impl()); if (local_state._input_eos) { - std::unique_lock lc(local_state._shared_state->sink_eos_lock); + LockGuard lc(local_state._shared_state->sink_eos_lock); local_state._shared_state->sink_eos = true; local_state._dependency->set_ready_to_read(); // ready for source to read } diff --git a/be/src/exec/operator/analytic_source_operator.cpp b/be/src/exec/operator/analytic_source_operator.cpp index de385e3dedee24..efdb3055ff6201 100644 --- a/be/src/exec/operator/analytic_source_operator.cpp +++ b/be/src/exec/operator/analytic_source_operator.cpp @@ -53,7 +53,7 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* state, Block* output_blo output_block->clear_column_data(); size_t output_rows = 0; { - std::lock_guard lock(local_state._shared_state->buffer_mutex); + LockGuard lock(local_state._shared_state->buffer_mutex); if (!local_state._shared_state->blocks_buffer.empty()) { local_state._shared_state->blocks_buffer.front().swap(*output_block); local_state._shared_state->blocks_buffer.pop(); @@ -61,11 +61,10 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* state, Block* output_blo //if buffer have no data and sink not eos, block reading and wait for signal again RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, output_block, output_block->columns())); - if (local_state._shared_state->blocks_buffer.empty() && - !local_state._shared_state->sink_eos) { + if (local_state._shared_state->blocks_buffer.empty()) { // add this mutex to check, as in some case maybe is doing block(), and the sink is doing set eos. // so have to hold mutex to set block(), avoid to sink have set eos and set ready, but here set block() by mistake - std::unique_lock lc(local_state._shared_state->sink_eos_lock); + LockGuard lc(local_state._shared_state->sink_eos_lock); if (!local_state._shared_state->sink_eos) { local_state._dependency->block(); // block self source local_state._dependency->set_ready_to_write(); // ready for sink write @@ -73,7 +72,7 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* state, Block* output_blo } } else { //iff buffer have no data and sink eos, set eos - std::unique_lock lc(local_state._shared_state->sink_eos_lock); + LockGuard lc(local_state._shared_state->sink_eos_lock); *eos = local_state._shared_state->sink_eos; } } diff --git a/be/src/exec/operator/exchange_sink_operator.cpp b/be/src/exec/operator/exchange_sink_operator.cpp index ffbe0780d87028..14061d610c4788 100644 --- a/be/src/exec/operator/exchange_sink_operator.cpp +++ b/be/src/exec/operator/exchange_sink_operator.cpp @@ -25,7 +25,6 @@ #include #include #include -#include #include #include @@ -217,7 +216,7 @@ void ExchangeSinkLocalState::_create_channels() { } void ExchangeSinkLocalState::on_channel_finished(InstanceLoId channel_id) { - std::lock_guard lock(_finished_channels_mutex); + LockGuard lock(_finished_channels_mutex); if (_finished_channels.contains(channel_id)) { LOG(WARNING) << "Query: " << print_id(_state->query_id()) diff --git a/be/src/exec/operator/exchange_sink_operator.h b/be/src/exec/operator/exchange_sink_operator.h index 9824e3aaf8879d..369f03ec6bfcc9 100644 --- a/be/src/exec/operator/exchange_sink_operator.h +++ b/be/src/exec/operator/exchange_sink_operator.h @@ -22,9 +22,9 @@ #include #include #include -#include #include "common/status.h" +#include "common/thread_safety_annotations.h" #include "exec/exchange/exchange_writer.h" #include "exec/exchange/vdata_stream_sender.h" #include "exec/operator/exchange_sink_buffer.h" @@ -180,8 +180,8 @@ class ExchangeSinkLocalState MOCK_REMOVE(final) : public PipelineXSinkLocalState int _last_local_channel_idx = -1; std::atomic_int _working_channels_count = 0; - std::set _finished_channels; - std::mutex _finished_channels_mutex; + std::set _finished_channels GUARDED_BY(_finished_channels_mutex); + AnnotatedMutex _finished_channels_mutex; }; class ExchangeSinkOperatorX MOCK_REMOVE(final) : public DataSinkOperatorX { diff --git a/be/src/exec/operator/hashjoin_build_sink.cpp b/be/src/exec/operator/hashjoin_build_sink.cpp index fa473a1ea2a9f6..d2d7dcb42c5b58 100644 --- a/be/src/exec/operator/hashjoin_build_sink.cpp +++ b/be/src/exec/operator/hashjoin_build_sink.cpp @@ -67,7 +67,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo _dependency->block(); _finish_dependency->block(); { - std::lock_guard guard(p._mutex); + LockGuard guard(p._mutex); p._finish_dependencies.push_back(_finish_dependency); } } else { @@ -243,10 +243,12 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu } if (p._use_shared_hash_table) { - std::unique_lock lock(p._mutex); + LockGuard lock(p._mutex); // Only signal non-builder tasks when the builder actually built the hash table. // When the builder is terminated early, process_build_block() has not initialized the - // shared hash table or runtime filter wrappers, so non-builders must return EOF. + // shared hash table or runtime filter wrappers. In that case the hash table variant is + // still monostate, so signaling non-builders would make them enter std::visit on + // monostate and crash with "Hash table type mismatch". if (!_terminated) { p._signaled = true; } diff --git a/be/src/exec/operator/hashjoin_build_sink.h b/be/src/exec/operator/hashjoin_build_sink.h index c6f492e8df743c..8bde64603cef29 100644 --- a/be/src/exec/operator/hashjoin_build_sink.h +++ b/be/src/exec/operator/hashjoin_build_sink.h @@ -17,6 +17,7 @@ #pragma once +#include "common/thread_safety_annotations.h" #include "exec/operator/join_build_sink_operator.h" #include "exec/operator/operator.h" #include "exec/runtime_filter/runtime_filter_producer_helper.h" @@ -197,8 +198,8 @@ class HashJoinBuildSinkOperatorX MOCK_REMOVE(final) bool _use_shared_hash_table = false; std::atomic _signaled = false; - std::mutex _mutex; - std::vector> _finish_dependencies; + AnnotatedMutex _mutex; + std::vector> _finish_dependencies GUARDED_BY(_mutex); std::map> _runtime_filters; }; diff --git a/be/src/exec/operator/multi_cast_data_streamer.cpp b/be/src/exec/operator/multi_cast_data_streamer.cpp index 142f78003384d1..da3129b305c5d8 100644 --- a/be/src/exec/operator/multi_cast_data_streamer.cpp +++ b/be/src/exec/operator/multi_cast_data_streamer.cpp @@ -48,10 +48,11 @@ MultiCastBlock::MultiCastBlock(Block* block, int un_finish_copy, size_t mem_size block->clear(); } -Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, Block* block, bool* eos) { +Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, Block* block, + bool* eos) NO_THREAD_SAFETY_ANALYSIS { MultiCastBlock* multi_cast_block = nullptr; { - INJECT_MOCK_SLEEP(std::unique_lock l(_mutex)); + UniqueLock l(_mutex); for (auto it = _spill_readers[sender_idx].begin(); it != _spill_readers[sender_idx].end();) { if ((*it)->all_data_read) { @@ -93,13 +94,15 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, Block* b auto spill_func = [this, reader_item, sender_idx]() { Block block; bool spill_eos = false; + bool has_cached_blocks = false; size_t read_size = 0; while (!spill_eos) { RETURN_IF_ERROR(reader_item->reader->read(&block, &spill_eos)); if (!block.empty()) { - std::lock_guard l(_mutex); + LockGuard l(_mutex); read_size += block.allocated_bytes(); _cached_blocks[sender_idx].emplace_back(std::move(block)); + has_cached_blocks = true; if (_cached_blocks[sender_idx].size() >= 32 || read_size > 2 * 1024 * 1024) { break; @@ -107,7 +110,7 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, Block* b } } - if (spill_eos || !_cached_blocks[sender_idx].empty()) { + if (spill_eos || has_cached_blocks) { reader_item->all_data_read = spill_eos; _set_ready_for_read(sender_idx); } @@ -159,7 +162,7 @@ Status MultiCastDataStreamer::_copy_block(RuntimeState* state, int32_t sender_id block->get_by_position(i).column = block->get_by_position(i).column->clone_resized(rows); } - INJECT_MOCK_SLEEP(std::lock_guard l(_mutex)); + LockGuard l(_mutex); multi_cast_block._un_finish_copy--; auto copying_count = _copying_count.fetch_sub(1) - 1; if (multi_cast_block._un_finish_copy == 0) { @@ -293,7 +296,7 @@ Status MultiCastDataStreamer::push(RuntimeState* state, doris::Block* block, boo const auto block_mem_size = block->allocated_bytes(); { - INJECT_MOCK_SLEEP(std::lock_guard l(_mutex)); + LockGuard l(_mutex); if (_pending_block) { DCHECK_GT(_pending_block->rows(), 0); const auto pending_size = _pending_block->allocated_bytes(); @@ -346,7 +349,7 @@ Status MultiCastDataStreamer::push(RuntimeState* state, doris::Block* block, boo _eos = eos; } - if (_eos) { + if (eos) { for (auto* read_dep : _dependencies) { read_dep->set_always_ready(); } @@ -377,7 +380,7 @@ std::string MultiCastDataStreamer::debug_string() { size_t pos_at_end_count = 0; size_t blocks_count = 0; { - std::unique_lock l(_mutex); + LockGuard l(_mutex); blocks_count = _multi_cast_blocks.size(); for (int32_t i = 0; i != _cast_sender_count; ++i) { if (!_dependencies[i]->is_blocked_by()) { diff --git a/be/src/exec/operator/multi_cast_data_streamer.h b/be/src/exec/operator/multi_cast_data_streamer.h index 773576fb2b932e..8d62ca78084946 100644 --- a/be/src/exec/operator/multi_cast_data_streamer.h +++ b/be/src/exec/operator/multi_cast_data_streamer.h @@ -22,6 +22,7 @@ #include #include +#include "common/thread_safety_annotations.h" #include "core/block/block.h" #include "exec/exchange/vdata_stream_sender.h" #include "exec/pipeline/dependency.h" @@ -100,16 +101,16 @@ class MultiCastDataStreamer { Status _copy_block(RuntimeState* state, int32_t sender_idx, Block* block, MultiCastBlock& multi_cast_block); - Status _start_spill_task(RuntimeState* state, SpillFileSPtr spill_file); + Status _start_spill_task(RuntimeState* state, SpillFileSPtr spill_file) REQUIRES(_mutex); - Status _trigger_spill_if_need(RuntimeState* state, bool* triggered); + Status _trigger_spill_if_need(RuntimeState* state, bool* triggered) REQUIRES(_mutex); RuntimeProfile* _profile = nullptr; - std::list _multi_cast_blocks; - std::vector> _cached_blocks; - std::vector::iterator> _sender_pos_to_read; - std::mutex _mutex; - bool _eos = false; + std::list _multi_cast_blocks GUARDED_BY(_mutex); + std::vector> _cached_blocks GUARDED_BY(_mutex); + std::vector::iterator> _sender_pos_to_read GUARDED_BY(_mutex); + AnnotatedMutex _mutex; + bool _eos GUARDED_BY(_mutex) = false; int _cast_sender_count = 0; int _node_id; std::atomic_int64_t _cumulative_mem_size = 0; @@ -120,9 +121,9 @@ class MultiCastDataStreamer { Dependency* _write_dependency; std::vector _dependencies; - BlockUPtr _pending_block; + BlockUPtr _pending_block GUARDED_BY(_mutex); - std::vector>> _spill_readers; + std::vector>> _spill_readers GUARDED_BY(_mutex); RuntimeProfile* _sink_operator_profile; // operator_profile of each source operator diff --git a/be/src/exec/operator/partition_sort_sink_operator.cpp b/be/src/exec/operator/partition_sort_sink_operator.cpp index 0f605b70d0d74d..66ed84d021e18b 100644 --- a/be/src/exec/operator/partition_sort_sink_operator.cpp +++ b/be/src/exec/operator/partition_sort_sink_operator.cpp @@ -128,7 +128,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, Block* input_block, if (local_state._is_need_passthrough) { { COUNTER_UPDATE(local_state._passthrough_rows_counter, (int64_t)current_rows); - std::lock_guard lock(local_state._shared_state->buffer_mutex); + LockGuard lock(local_state._shared_state->buffer_mutex); local_state._shared_state->blocks_buffer.push(std::move(*input_block)); // buffer have data, source could read this. local_state._dependency->set_ready_to_read(); @@ -159,8 +159,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, Block* input_block, } local_state._value_places[i]->_blocks.clear(); RETURN_IF_ERROR(sorter->prepare_for_read(false)); - INJECT_MOCK_SLEEP(std::unique_lock lc( - local_state._shared_state->prepared_finish_lock)); + LockGuard lc(local_state._shared_state->prepared_finish_lock); sorter->set_prepared_finish(); // iff one sorter have data, then could set source ready to read local_state._dependency->set_ready_to_read(); @@ -171,7 +170,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, Block* input_block, local_state._sorted_partition_input_rows); //so all data from child have sink completed { - std::unique_lock lc(local_state._shared_state->sink_eos_lock); + LockGuard lc(local_state._shared_state->sink_eos_lock); local_state._shared_state->sink_eos = true; // this ready is also need, as source maybe block by self in some case local_state._dependency->set_ready_to_read(); @@ -262,8 +261,7 @@ Status PartitionSortSinkOperatorX::_emplace_into_hash_table( { COUNTER_UPDATE(local_state._passthrough_rows_counter, (int64_t)(row + 1)); - std::lock_guard lock( - local_state._shared_state->buffer_mutex); + LockGuard lock(local_state._shared_state->buffer_mutex); // have emplace (num_rows - row) to hashtable, and now have row remaining needed in block; // set_num_rows(x) retains the range [0, x - 1], so row + 1 is needed here. input_block->set_num_rows(row + 1); diff --git a/be/src/exec/operator/partition_sort_source_operator.cpp b/be/src/exec/operator/partition_sort_source_operator.cpp index eeaf4683c5a7e1..2e3dad1d32afe4 100644 --- a/be/src/exec/operator/partition_sort_source_operator.cpp +++ b/be/src/exec/operator/partition_sort_source_operator.cpp @@ -41,17 +41,16 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, Block* outpu output_block->clear_column_data(); auto get_data_from_blocks_buffer = false; { - std::lock_guard lock(local_state._shared_state->buffer_mutex); + LockGuard lock(local_state._shared_state->buffer_mutex); get_data_from_blocks_buffer = !local_state._shared_state->blocks_buffer.empty(); if (get_data_from_blocks_buffer) { local_state._shared_state->blocks_buffer.front().swap(*output_block); local_state._shared_state->blocks_buffer.pop(); - if (local_state._shared_state->blocks_buffer.empty() && - !local_state._shared_state->sink_eos) { + if (local_state._shared_state->blocks_buffer.empty()) { // add this mutex to check, as in some case maybe is doing block(), and the sink is doing set eos. // so have to hold mutex to set block(), avoid to sink have set eos and set ready, but here set block() by mistake - std::unique_lock lc(local_state._shared_state->sink_eos_lock); + LockGuard lc(local_state._shared_state->sink_eos_lock); //if buffer have no data and sink not eos, block reading and wait for signal again if (!local_state._shared_state->sink_eos) { local_state._dependency->block(); @@ -94,7 +93,7 @@ Status PartitionSortSourceOperatorX::get_sorted_block(RuntimeState* state, Block if (current_eos) { // current sort have eos, so get next idx local_state._sort_idx++; - std::unique_lock lc(local_state._shared_state->prepared_finish_lock); + LockGuard lc(local_state._shared_state->prepared_finish_lock); if (local_state._sort_idx < sorter_size && !sorters[local_state._sort_idx]->prepared_finish()) { local_state._dependency->block(); diff --git a/be/src/exec/operator/scan_operator.cpp b/be/src/exec/operator/scan_operator.cpp index 3d34d32ceb07ef..87c02aab2b682f 100644 --- a/be/src/exec/operator/scan_operator.cpp +++ b/be/src/exec/operator/scan_operator.cpp @@ -75,7 +75,7 @@ bool ScanLocalState::should_run_serial() const { Status ScanLocalStateBase::update_late_arrival_runtime_filter(RuntimeState* state, int& arrived_rf_num) { // Lock needed because _conjuncts can be accessed concurrently by multiple scanner threads - std::unique_lock lock(_conjuncts_lock); + LockGuard lock(_conjuncts_lock); RETURN_IF_ERROR(_helper.try_append_late_arrival_runtime_filter(state, _parent->row_descriptor(), arrived_rf_num, _conjuncts)); if (state->enable_adjust_conjunct_order_by_cost()) { @@ -88,7 +88,7 @@ Status ScanLocalStateBase::update_late_arrival_runtime_filter(RuntimeState* stat Status ScanLocalStateBase::clone_conjunct_ctxs(VExprContextSPtrs& scanner_conjuncts) { // Lock needed because _conjuncts can be accessed concurrently by multiple scanner threads - std::unique_lock lock(_conjuncts_lock); + LockGuard lock(_conjuncts_lock); scanner_conjuncts.resize(_conjuncts.size()); for (size_t i = 0; i != _conjuncts.size(); ++i) { RETURN_IF_ERROR(_conjuncts[i]->clone(_state, scanner_conjuncts[i])); diff --git a/be/src/exec/operator/scan_operator.h b/be/src/exec/operator/scan_operator.h index d6e2407a8d2fba..cfffa5a50b30be 100644 --- a/be/src/exec/operator/scan_operator.h +++ b/be/src/exec/operator/scan_operator.h @@ -18,11 +18,11 @@ #pragma once #include -#include #include #include #include "common/status.h" +#include "common/thread_safety_annotations.h" #include "core/field.h" #include "exec/common/util.hpp" #include "exec/operator/operator.h" @@ -126,7 +126,7 @@ class ScanLocalStateBase : public PipelineXLocalState<> { RuntimeProfile::Counter* _scan_rows = nullptr; RuntimeProfile::Counter* _scan_bytes = nullptr; - std::mutex _conjuncts_lock; + AnnotatedMutex _conjuncts_lock; RuntimeFilterConsumerHelper _helper; // magic number as seed to generate hash value for condition cache uint64_t _condition_cache_digest = 0; diff --git a/be/src/exec/pipeline/dependency.h b/be/src/exec/pipeline/dependency.h index 28c89b5b990ebf..2c6e3b9aef2361 100644 --- a/be/src/exec/pipeline/dependency.h +++ b/be/src/exec/pipeline/dependency.h @@ -35,6 +35,7 @@ #include "common/config.h" #include "common/logging.h" +#include "common/thread_safety_annotations.h" #include "core/block/block.h" #include "core/types.h" #include "exec/common/agg_utils.h" @@ -507,10 +508,10 @@ struct AnalyticSharedState : public BasicSharedState { public: AnalyticSharedState() = default; - std::queue blocks_buffer; - std::mutex buffer_mutex; - bool sink_eos = false; - std::mutex sink_eos_lock; + std::queue blocks_buffer GUARDED_BY(buffer_mutex); + AnnotatedMutex buffer_mutex; + bool sink_eos GUARDED_BY(sink_eos_lock) = false; + AnnotatedMutex sink_eos_lock; Arena agg_arena_pool; }; @@ -598,12 +599,12 @@ struct NestedLoopJoinSharedState : public JoinSharedState { struct PartitionSortNodeSharedState : public BasicSharedState { ENABLE_FACTORY_CREATOR(PartitionSortNodeSharedState) public: - std::queue blocks_buffer; - std::mutex buffer_mutex; + std::queue blocks_buffer GUARDED_BY(buffer_mutex); + AnnotatedMutex buffer_mutex; std::vector> partition_sorts; - bool sink_eos = false; - std::mutex sink_eos_lock; - std::mutex prepared_finish_lock; + bool sink_eos GUARDED_BY(sink_eos_lock) = false; + AnnotatedMutex sink_eos_lock; + AnnotatedMutex prepared_finish_lock; }; struct SetSharedState : public BasicSharedState { diff --git a/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp b/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp index d50523605d0964..61589327909d63 100644 --- a/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp +++ b/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp @@ -316,13 +316,16 @@ TEST_F(MultiCastDataStreamerTest, SpillTest) { output2.join(); output3.join(); - ASSERT_EQ(multi_cast_data_streamer->_multi_cast_blocks.size(), 0); - ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[0].size(), 0); - ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[1].size(), 0); - ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[2].size(), 0); - ASSERT_EQ(multi_cast_data_streamer->_spill_readers[0].size(), 0); - ASSERT_EQ(multi_cast_data_streamer->_spill_readers[1].size(), 0); - ASSERT_EQ(multi_cast_data_streamer->_spill_readers[2].size(), 0); + { + LockGuard l(multi_cast_data_streamer->_mutex); + ASSERT_EQ(multi_cast_data_streamer->_multi_cast_blocks.size(), 0); + ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[0].size(), 0); + ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[1].size(), 0); + ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[2].size(), 0); + ASSERT_EQ(multi_cast_data_streamer->_spill_readers[0].size(), 0); + ASSERT_EQ(multi_cast_data_streamer->_spill_readers[1].size(), 0); + ASSERT_EQ(multi_cast_data_streamer->_spill_readers[2].size(), 0); + } auto debug_string = multi_cast_data_streamer->debug_string(); EXPECT_TRUE(debug_string.find("MemSize:") != std::string::npos);