From 457db198df89a1a4e4b1bc34dd301634a93f2906 Mon Sep 17 00:00:00 2001 From: Lars Froehlich Date: Wed, 19 Feb 2025 15:29:07 +0100 Subject: [PATCH 1/4] ThreadPool: Add get_thread_index() [why] There are use cases in which it is beneficial to know on which of the N pool threads a function gets executed. [how] Store a thread-local thread index when launching the threads. Return this index when called from a worker thread. For other threads, the thread-local index gets initialized to the maximum value of size_t; if this value is found, get_thread_index() throws an exception. Co-authored-by: Soeren Grunewald Signed-off-by: Lars Froehlich --- include/gul14/ThreadPool.h | 26 ++++++++++++++++++++++++-- src/ThreadPool.cc | 23 +++++++++++++++++++---- tests/test_ThreadPool.cc | 35 ++++++++++++++++++++++++++++++++++- 3 files changed, 77 insertions(+), 7 deletions(-) diff --git a/include/gul14/ThreadPool.h b/include/gul14/ThreadPool.h index 7a0391f1..bd48d05e 100644 --- a/include/gul14/ThreadPool.h +++ b/include/gul14/ThreadPool.h @@ -4,7 +4,7 @@ * \date Created on November 6, 2018 * \brief Declaration of the ThreadPool class. * - * \copyright Copyright 2018-2024 Deutsches Elektronen-Synchrotron (DESY), Hamburg + * \copyright Copyright 2018-2025 Deutsches Elektronen-Synchrotron (DESY), Hamburg * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -418,6 +419,18 @@ class ThreadPool : public std::enable_shared_from_this GUL_EXPORT std::vector get_running_task_names() const; + /** + * If called from a worker thread, return the zero-based index of the thread in the + * pool (0 for the first thread, 1 for the second, and so on). + * + * \exception std::runtime_error is thrown if this function is called from a thread + * that is not part of the pool. + * + * \since GUL version 2.13 + */ + GUL_EXPORT + std::size_t get_thread_index() const; + /// Determine whether the queue for pending tasks is full (at capacity). GUL_EXPORT bool is_full() const noexcept; @@ -513,6 +526,13 @@ class ThreadPool : public std::enable_shared_from_this */ std::vector threads_; + /** + * For worker threads, this is the index of the thread in the threads_ vector. + * For other threads, the value is meaningless and the variable is initialized to + * numeric_limits::max(). + */ + thread_local static std::size_t thread_index_; + /** * A condition variable used together with mutex_ to wake up a worker thread when a * new task is added (or when shutdown is requested). @@ -583,8 +603,10 @@ class ThreadPool : public std::enable_shared_from_this /** * The main loop run in the thread; picks one task off the queue and executes it, then * repeats until asked to quit. + * + * \param thread_index Index of the thread in the threads_ vector */ - void perform_work(); + void perform_work(std::size_t thread_index); }; /** diff --git a/src/ThreadPool.cc b/src/ThreadPool.cc index b125758b..f7420e98 100644 --- a/src/ThreadPool.cc +++ b/src/ThreadPool.cc @@ -4,7 +4,7 @@ * \date Created on November 6, 2018 * \brief Implementation of the ThreadPool class. * - * \copyright Copyright 2018-2024 Deutsches Elektronen-Synchrotron (DESY), Hamburg + * \copyright Copyright 2018-2025 Deutsches Elektronen-Synchrotron (DESY), Hamburg * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published @@ -21,6 +21,7 @@ */ #include +#include #include #include @@ -61,8 +62,8 @@ ThreadPool::ThreadPool(std::size_t num_threads, std::size_t capacity) throw std::invalid_argument(cat("Illegal capacity for thread pool: ", capacity)); threads_.reserve(num_threads); - while (threads_.size() < num_threads) - threads_.emplace_back([this]() { perform_work(); }); + for (std::size_t i = 0; i != num_threads; ++i) + threads_.emplace_back([this, i]() { perform_work(i); }); } ThreadPool::~ThreadPool() @@ -134,6 +135,14 @@ std::vector ThreadPool::get_running_task_names() const return running_task_names_; } +std::size_t ThreadPool::get_thread_index() const +{ + if (thread_index_ == std::numeric_limits::max()) + throw std::runtime_error("This thread is not part of the pool"); + + return thread_index_; +} + bool ThreadPool::is_full() const noexcept { std::lock_guard lock(mutex_); @@ -182,7 +191,7 @@ std::shared_ptr ThreadPool::make_shared( return std::shared_ptr(new ThreadPool(num_threads, capacity)); } -void ThreadPool::perform_work() +void ThreadPool::perform_work(const std::size_t thread_index) { #if defined(__APPLE__) || defined(__GNUC__) // On unixoid systems, we block a number of signals in the worker threads because we @@ -203,6 +212,9 @@ void ThreadPool::perform_work() pthread_sigmask(SIG_BLOCK, &mask, 0); #endif + // Assign thread-local thread index + thread_index_ = thread_index; + std::unique_lock lock(mutex_); while (!shutdown_requested_) @@ -264,4 +276,7 @@ void ThreadPool::perform_work() } } +thread_local std::size_t +ThreadPool::thread_index_{ std::numeric_limits::max() }; + } // namespace gul14 diff --git a/tests/test_ThreadPool.cc b/tests/test_ThreadPool.cc index 3cbb0f28..011f6a7c 100644 --- a/tests/test_ThreadPool.cc +++ b/tests/test_ThreadPool.cc @@ -4,7 +4,7 @@ * \date Created on March 17, 2023 * \brief Declaration of the ThreadPool class. * - * \copyright Copyright 2023-2024 Deutsches Elektronen-Synchrotron (DESY), Hamburg + * \copyright Copyright 2023-2025 Deutsches Elektronen-Synchrotron (DESY), Hamburg * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published @@ -22,12 +22,14 @@ // SPDX-License-Identifier: LGPL-2.1-or-later +#include #include #include #include "gul14/catch.h" #include "gul14/ThreadPool.h" #include "gul14/time_util.h" +#include "gul14/Trigger.h" using namespace gul14; using namespace std::literals; @@ -450,6 +452,37 @@ TEST_CASE("ThreadPool: get_running_task_names()", "[ThreadPool]") pool.reset(); } +TEST_CASE("ThreadPool: get_thread_index()", "[ThreadPool]") +{ + std::array, 2> indices; + Trigger trigger; + + auto pool = make_thread_pool(2); + + pool->add_task( + [&](ThreadPool& p) { trigger.wait(); indices[0] = p.get_thread_index(); }); + pool->add_task( + [&](ThreadPool& p) { trigger.wait(); indices[1] = p.get_thread_index(); }); + + while (pool->count_pending() > 0) + gul14::sleep(1ms); + + trigger = true; + + // From a non-worker thread, the function must throw. + REQUIRE_THROWS_AS(pool->get_thread_index(), std::runtime_error); + + pool.reset(); + + CAPTURE(indices); + REQUIRE(indices[0] <= 1); + REQUIRE(indices[1] <= 1); + if (indices[0] == 0) + REQUIRE(indices[1] == 1); + if (indices[0] == 1) + REQUIRE(indices[1] == 0); +} + TEST_CASE("ThreadPool: is_full()", "[ThreadPool]") { auto pool = make_thread_pool(1); From ea0bddda5e0048278b4360b7eb1463c40763a58f Mon Sep 17 00:00:00 2001 From: Lars Froehlich Date: Wed, 19 Feb 2025 15:51:29 +0100 Subject: [PATCH 2/4] ThreadPool: Restore alphabetic ordering of member functions Signed-off-by: Lars Froehlich --- src/ThreadPool.cc | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/ThreadPool.cc b/src/ThreadPool.cc index f7420e98..89caec9d 100644 --- a/src/ThreadPool.cc +++ b/src/ThreadPool.cc @@ -135,6 +135,24 @@ std::vector ThreadPool::get_running_task_names() const return running_task_names_; } +ThreadPool::InternalTaskState ThreadPool::get_task_state(const TaskId task_id) const +{ + std::lock_guard lock(mutex_); + + const auto itr = std::find( + running_task_ids_.begin(), running_task_ids_.end(), task_id); + if (itr != running_task_ids_.end()) + return InternalTaskState::running; + + const auto itp = std::find_if( + pending_tasks_.begin(), pending_tasks_.end(), + [task_id](const Task& t) { return t.id_ == task_id; }); + if (itp != pending_tasks_.end()) + return InternalTaskState::pending; + + return InternalTaskState::unknown; +} + std::size_t ThreadPool::get_thread_index() const { if (thread_index_ == std::numeric_limits::max()) @@ -160,24 +178,6 @@ bool ThreadPool::is_idle() const return pending_tasks_.empty() && running_task_ids_.empty(); } -ThreadPool::InternalTaskState ThreadPool::get_task_state(const TaskId task_id) const -{ - std::lock_guard lock(mutex_); - - const auto itr = std::find( - running_task_ids_.begin(), running_task_ids_.end(), task_id); - if (itr != running_task_ids_.end()) - return InternalTaskState::running; - - const auto itp = std::find_if( - pending_tasks_.begin(), pending_tasks_.end(), - [task_id](const Task& t) { return t.id_ == task_id; }); - if (itp != pending_tasks_.end()) - return InternalTaskState::pending; - - return InternalTaskState::unknown; -} - bool ThreadPool::is_shutdown_requested() const { std::lock_guard lock(mutex_); From 9f161c17a3f7b43ba468eaaa07d6c86674904bc1 Mon Sep 17 00:00:00 2001 From: Lars Froehlich Date: Wed, 19 Feb 2025 15:53:37 +0100 Subject: [PATCH 3/4] Bump version to 2.13 and document changes in ThreadPool [why] We have extended the API by adding ThreadPool::get_thread_index(). Signed-off-by: Lars Froehlich --- data/doxygen.h | 4 ++++ meson.build | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/data/doxygen.h b/data/doxygen.h index 83f2fc55..e561431a 100644 --- a/data/doxygen.h +++ b/data/doxygen.h @@ -144,6 +144,10 @@ namespace gul14 { * * \section changelog_2_x 2.x Versions * + * \subsection V2_13_0 Version 2.13.0 + * + * - Add ThreadPool::get_thread_index() + * * \subsection V2_12_1 Version 2.12.1 * * - Allow mutable function objects in ThreadPool::add_task() diff --git a/meson.build b/meson.build index 563acf4c..cf44f674 100644 --- a/meson.build +++ b/meson.build @@ -8,7 +8,7 @@ project('gul14', 'cpp', 'cpp_std=c++14', 'warning_level=3', ], - version : '2.12', + version : '2.13', meson_version : '>=0.49') # Enforce that the version number is according to specs From 3f915d359734cc471adb3530bb4092f9e2ef073f Mon Sep 17 00:00:00 2001 From: Lars Froehlich Date: Thu, 27 Feb 2025 13:34:38 +0100 Subject: [PATCH 4/4] Rename "thread index" to "thread id" [why] To avoid this being associated with an index into some array, because users cannot actually index anything in the ThreadPool. [how] Introduce a member type ThreadId and rename member functions and variables accordingly (get_thread_id() etc.). Proposed-by: Ulf Fini Jastrow Signed-off-by: Lars Froehlich --- data/doxygen.h | 4 ++-- include/gul14/ThreadPool.h | 14 +++++++++----- src/ThreadPool.cc | 16 ++++++++-------- tests/test_ThreadPool.cc | 10 +++++----- 4 files changed, 24 insertions(+), 20 deletions(-) diff --git a/data/doxygen.h b/data/doxygen.h index e561431a..2825a3e0 100644 --- a/data/doxygen.h +++ b/data/doxygen.h @@ -146,7 +146,7 @@ namespace gul14 { * * \subsection V2_13_0 Version 2.13.0 * - * - Add ThreadPool::get_thread_index() + * - Add ThreadPool::get_thread_id() * * \subsection V2_12_1 Version 2.12.1 * @@ -758,7 +758,7 @@ namespace gul14 { * tests. It bundles all of its functionality in a single header file. For convenience, * the GUL-internal version of this header can be accessed via: * \code - * #include "gul14/catch.h" + * #include * // Your unit tests here * \endcode * Please refer to https://github.com/catchorg/Catch2/blob/master/docs/tutorial.md diff --git a/include/gul14/ThreadPool.h b/include/gul14/ThreadPool.h index bd48d05e..9eaffcbf 100644 --- a/include/gul14/ThreadPool.h +++ b/include/gul14/ThreadPool.h @@ -113,6 +113,9 @@ class ThreadPool : public std::enable_shared_from_this /// A unique identifier for a task. using TaskId = std::uint64_t; + /// A unique identifier for a thread in the pool in the range of [0, count_threads()). + using ThreadId = std::vector::size_type; + /** * A handle for a task that has (or had) been enqueued on a ThreadPool. * @@ -420,8 +423,9 @@ class ThreadPool : public std::enable_shared_from_this std::vector get_running_task_names() const; /** - * If called from a worker thread, return the zero-based index of the thread in the - * pool (0 for the first thread, 1 for the second, and so on). + * Return the thread pool ID of the current thread. + * + * \returns a thread ID in the range [0, count_threads()). * * \exception std::runtime_error is thrown if this function is called from a thread * that is not part of the pool. @@ -429,7 +433,7 @@ class ThreadPool : public std::enable_shared_from_this * \since GUL version 2.13 */ GUL_EXPORT - std::size_t get_thread_index() const; + ThreadId get_thread_id() const; /// Determine whether the queue for pending tasks is full (at capacity). GUL_EXPORT @@ -529,9 +533,9 @@ class ThreadPool : public std::enable_shared_from_this /** * For worker threads, this is the index of the thread in the threads_ vector. * For other threads, the value is meaningless and the variable is initialized to - * numeric_limits::max(). + * numeric_limits::max(). */ - thread_local static std::size_t thread_index_; + thread_local static ThreadId thread_id_; /** * A condition variable used together with mutex_ to wake up a worker thread when a diff --git a/src/ThreadPool.cc b/src/ThreadPool.cc index 89caec9d..bd4a5681 100644 --- a/src/ThreadPool.cc +++ b/src/ThreadPool.cc @@ -153,12 +153,12 @@ ThreadPool::InternalTaskState ThreadPool::get_task_state(const TaskId task_id) c return InternalTaskState::unknown; } -std::size_t ThreadPool::get_thread_index() const +ThreadPool::ThreadId ThreadPool::get_thread_id() const { - if (thread_index_ == std::numeric_limits::max()) + if (thread_id_ == std::numeric_limits::max()) throw std::runtime_error("This thread is not part of the pool"); - return thread_index_; + return thread_id_; } bool ThreadPool::is_full() const noexcept @@ -191,7 +191,7 @@ std::shared_ptr ThreadPool::make_shared( return std::shared_ptr(new ThreadPool(num_threads, capacity)); } -void ThreadPool::perform_work(const std::size_t thread_index) +void ThreadPool::perform_work(const ThreadPool::ThreadId thread_id) { #if defined(__APPLE__) || defined(__GNUC__) // On unixoid systems, we block a number of signals in the worker threads because we @@ -212,8 +212,8 @@ void ThreadPool::perform_work(const std::size_t thread_index) pthread_sigmask(SIG_BLOCK, &mask, 0); #endif - // Assign thread-local thread index - thread_index_ = thread_index; + // Assign thread-local thread ID + thread_id_ = thread_id; std::unique_lock lock(mutex_); @@ -276,7 +276,7 @@ void ThreadPool::perform_work(const std::size_t thread_index) } } -thread_local std::size_t -ThreadPool::thread_index_{ std::numeric_limits::max() }; +thread_local ThreadPool::ThreadId +ThreadPool::thread_id_{ std::numeric_limits::max() }; } // namespace gul14 diff --git a/tests/test_ThreadPool.cc b/tests/test_ThreadPool.cc index 011f6a7c..761035ed 100644 --- a/tests/test_ThreadPool.cc +++ b/tests/test_ThreadPool.cc @@ -452,17 +452,17 @@ TEST_CASE("ThreadPool: get_running_task_names()", "[ThreadPool]") pool.reset(); } -TEST_CASE("ThreadPool: get_thread_index()", "[ThreadPool]") +TEST_CASE("ThreadPool: get_thread_id()", "[ThreadPool]") { - std::array, 2> indices; + std::array, 2> indices; Trigger trigger; auto pool = make_thread_pool(2); pool->add_task( - [&](ThreadPool& p) { trigger.wait(); indices[0] = p.get_thread_index(); }); + [&](ThreadPool& p) { trigger.wait(); indices[0] = p.get_thread_id(); }); pool->add_task( - [&](ThreadPool& p) { trigger.wait(); indices[1] = p.get_thread_index(); }); + [&](ThreadPool& p) { trigger.wait(); indices[1] = p.get_thread_id(); }); while (pool->count_pending() > 0) gul14::sleep(1ms); @@ -470,7 +470,7 @@ TEST_CASE("ThreadPool: get_thread_index()", "[ThreadPool]") trigger = true; // From a non-worker thread, the function must throw. - REQUIRE_THROWS_AS(pool->get_thread_index(), std::runtime_error); + REQUIRE_THROWS_AS(pool->get_thread_id(), std::runtime_error); pool.reset();