diff --git a/data/doxygen.h b/data/doxygen.h index 83f2fc55..2825a3e0 100644 --- a/data/doxygen.h +++ b/data/doxygen.h @@ -144,6 +144,10 @@ namespace gul14 { * * \section changelog_2_x 2.x Versions * + * \subsection V2_13_0 Version 2.13.0 + * + * - Add ThreadPool::get_thread_id() + * * \subsection V2_12_1 Version 2.12.1 * * - Allow mutable function objects in ThreadPool::add_task() @@ -754,7 +758,7 @@ namespace gul14 { * tests. It bundles all of its functionality in a single header file. For convenience, * the GUL-internal version of this header can be accessed via: * \code - * #include "gul14/catch.h" + * #include * // Your unit tests here * \endcode * Please refer to https://github.com/catchorg/Catch2/blob/master/docs/tutorial.md diff --git a/include/gul14/ThreadPool.h b/include/gul14/ThreadPool.h index 7a0391f1..9eaffcbf 100644 --- a/include/gul14/ThreadPool.h +++ b/include/gul14/ThreadPool.h @@ -4,7 +4,7 @@ * \date Created on November 6, 2018 * \brief Declaration of the ThreadPool class. * - * \copyright Copyright 2018-2024 Deutsches Elektronen-Synchrotron (DESY), Hamburg + * \copyright Copyright 2018-2025 Deutsches Elektronen-Synchrotron (DESY), Hamburg * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -112,6 +113,9 @@ class ThreadPool : public std::enable_shared_from_this /// A unique identifier for a task. using TaskId = std::uint64_t; + /// A unique identifier for a thread in the pool in the range of [0, count_threads()). + using ThreadId = std::vector::size_type; + /** * A handle for a task that has (or had) been enqueued on a ThreadPool. * @@ -418,6 +422,19 @@ class ThreadPool : public std::enable_shared_from_this GUL_EXPORT std::vector get_running_task_names() const; + /** + * Return the thread pool ID of the current thread. + * + * \returns a thread ID in the range [0, count_threads()). + * + * \exception std::runtime_error is thrown if this function is called from a thread + * that is not part of the pool. + * + * \since GUL version 2.13 + */ + GUL_EXPORT + ThreadId get_thread_id() const; + /// Determine whether the queue for pending tasks is full (at capacity). GUL_EXPORT bool is_full() const noexcept; @@ -513,6 +530,13 @@ class ThreadPool : public std::enable_shared_from_this */ std::vector threads_; + /** + * For worker threads, this is the index of the thread in the threads_ vector. + * For other threads, the value is meaningless and the variable is initialized to + * numeric_limits::max(). + */ + thread_local static ThreadId thread_id_; + /** * A condition variable used together with mutex_ to wake up a worker thread when a * new task is added (or when shutdown is requested). @@ -583,8 +607,10 @@ class ThreadPool : public std::enable_shared_from_this /** * The main loop run in the thread; picks one task off the queue and executes it, then * repeats until asked to quit. + * + * \param thread_index Index of the thread in the threads_ vector */ - void perform_work(); + void perform_work(std::size_t thread_index); }; /** diff --git a/meson.build b/meson.build index 563acf4c..cf44f674 100644 --- a/meson.build +++ b/meson.build @@ -8,7 +8,7 @@ project('gul14', 'cpp', 'cpp_std=c++14', 'warning_level=3', ], - version : '2.12', + version : '2.13', meson_version : '>=0.49') # Enforce that the version number is according to specs diff --git a/src/ThreadPool.cc b/src/ThreadPool.cc index b125758b..bd4a5681 100644 --- a/src/ThreadPool.cc +++ b/src/ThreadPool.cc @@ -4,7 +4,7 @@ * \date Created on November 6, 2018 * \brief Implementation of the ThreadPool class. * - * \copyright Copyright 2018-2024 Deutsches Elektronen-Synchrotron (DESY), Hamburg + * \copyright Copyright 2018-2025 Deutsches Elektronen-Synchrotron (DESY), Hamburg * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published @@ -21,6 +21,7 @@ */ #include +#include #include #include @@ -61,8 +62,8 @@ ThreadPool::ThreadPool(std::size_t num_threads, std::size_t capacity) throw std::invalid_argument(cat("Illegal capacity for thread pool: ", capacity)); threads_.reserve(num_threads); - while (threads_.size() < num_threads) - threads_.emplace_back([this]() { perform_work(); }); + for (std::size_t i = 0; i != num_threads; ++i) + threads_.emplace_back([this, i]() { perform_work(i); }); } ThreadPool::~ThreadPool() @@ -134,23 +135,6 @@ std::vector ThreadPool::get_running_task_names() const return running_task_names_; } -bool ThreadPool::is_full() const noexcept -{ - std::lock_guard lock(mutex_); - return is_full_i(); -} - -bool ThreadPool::is_full_i() const noexcept -{ - return pending_tasks_.size() >= capacity_; -} - -bool ThreadPool::is_idle() const -{ - std::lock_guard lock(mutex_); - return pending_tasks_.empty() && running_task_ids_.empty(); -} - ThreadPool::InternalTaskState ThreadPool::get_task_state(const TaskId task_id) const { std::lock_guard lock(mutex_); @@ -169,6 +153,31 @@ ThreadPool::InternalTaskState ThreadPool::get_task_state(const TaskId task_id) c return InternalTaskState::unknown; } +ThreadPool::ThreadId ThreadPool::get_thread_id() const +{ + if (thread_id_ == std::numeric_limits::max()) + throw std::runtime_error("This thread is not part of the pool"); + + return thread_id_; +} + +bool ThreadPool::is_full() const noexcept +{ + std::lock_guard lock(mutex_); + return is_full_i(); +} + +bool ThreadPool::is_full_i() const noexcept +{ + return pending_tasks_.size() >= capacity_; +} + +bool ThreadPool::is_idle() const +{ + std::lock_guard lock(mutex_); + return pending_tasks_.empty() && running_task_ids_.empty(); +} + bool ThreadPool::is_shutdown_requested() const { std::lock_guard lock(mutex_); @@ -182,7 +191,7 @@ std::shared_ptr ThreadPool::make_shared( return std::shared_ptr(new ThreadPool(num_threads, capacity)); } -void ThreadPool::perform_work() +void ThreadPool::perform_work(const ThreadPool::ThreadId thread_id) { #if defined(__APPLE__) || defined(__GNUC__) // On unixoid systems, we block a number of signals in the worker threads because we @@ -203,6 +212,9 @@ void ThreadPool::perform_work() pthread_sigmask(SIG_BLOCK, &mask, 0); #endif + // Assign thread-local thread ID + thread_id_ = thread_id; + std::unique_lock lock(mutex_); while (!shutdown_requested_) @@ -264,4 +276,7 @@ void ThreadPool::perform_work() } } +thread_local ThreadPool::ThreadId +ThreadPool::thread_id_{ std::numeric_limits::max() }; + } // namespace gul14 diff --git a/tests/test_ThreadPool.cc b/tests/test_ThreadPool.cc index 3cbb0f28..761035ed 100644 --- a/tests/test_ThreadPool.cc +++ b/tests/test_ThreadPool.cc @@ -4,7 +4,7 @@ * \date Created on March 17, 2023 * \brief Declaration of the ThreadPool class. * - * \copyright Copyright 2023-2024 Deutsches Elektronen-Synchrotron (DESY), Hamburg + * \copyright Copyright 2023-2025 Deutsches Elektronen-Synchrotron (DESY), Hamburg * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published @@ -22,12 +22,14 @@ // SPDX-License-Identifier: LGPL-2.1-or-later +#include #include #include #include "gul14/catch.h" #include "gul14/ThreadPool.h" #include "gul14/time_util.h" +#include "gul14/Trigger.h" using namespace gul14; using namespace std::literals; @@ -450,6 +452,37 @@ TEST_CASE("ThreadPool: get_running_task_names()", "[ThreadPool]") pool.reset(); } +TEST_CASE("ThreadPool: get_thread_id()", "[ThreadPool]") +{ + std::array, 2> indices; + Trigger trigger; + + auto pool = make_thread_pool(2); + + pool->add_task( + [&](ThreadPool& p) { trigger.wait(); indices[0] = p.get_thread_id(); }); + pool->add_task( + [&](ThreadPool& p) { trigger.wait(); indices[1] = p.get_thread_id(); }); + + while (pool->count_pending() > 0) + gul14::sleep(1ms); + + trigger = true; + + // From a non-worker thread, the function must throw. + REQUIRE_THROWS_AS(pool->get_thread_id(), std::runtime_error); + + pool.reset(); + + CAPTURE(indices); + REQUIRE(indices[0] <= 1); + REQUIRE(indices[1] <= 1); + if (indices[0] == 0) + REQUIRE(indices[1] == 1); + if (indices[0] == 1) + REQUIRE(indices[1] == 0); +} + TEST_CASE("ThreadPool: is_full()", "[ThreadPool]") { auto pool = make_thread_pool(1);