Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion data/doxygen.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ namespace gul14 {
*
* \section changelog_2_x 2.x Versions
*
* \subsection V2_13_0 Version 2.13.0
*
* - Add ThreadPool::get_thread_id()
*
* \subsection V2_12_1 Version 2.12.1
*
* - Allow mutable function objects in ThreadPool::add_task()
Expand Down Expand Up @@ -754,7 +758,7 @@ namespace gul14 {
* tests. It bundles all of its functionality in a single header file. For convenience,
* the GUL-internal version of this header can be accessed via:
* \code
* #include "gul14/catch.h"
* #include <catch2/catch_all.hpp>
* // Your unit tests here
* \endcode
* Please refer to https://github.com/catchorg/Catch2/blob/master/docs/tutorial.md
Expand Down
30 changes: 28 additions & 2 deletions include/gul14/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* \date Created on November 6, 2018
* \brief Declaration of the ThreadPool class.
*
* \copyright Copyright 2018-2024 Deutsches Elektronen-Synchrotron (DESY), Hamburg
* \copyright Copyright 2018-2025 Deutsches Elektronen-Synchrotron (DESY), Hamburg
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
Expand All @@ -29,6 +29,7 @@
#include <condition_variable>
#include <functional>
#include <future>
#include <limits>
#include <memory>
#include <mutex>
#include <stdexcept>
Expand Down Expand Up @@ -112,6 +113,9 @@ class ThreadPool : public std::enable_shared_from_this<ThreadPool>
/// A unique identifier for a task.
using TaskId = std::uint64_t;

/// A unique identifier for a thread in the pool in the range of [0, count_threads()).
using ThreadId = std::vector<std::thread>::size_type;

/**
* A handle for a task that has (or had) been enqueued on a ThreadPool.
*
Expand Down Expand Up @@ -418,6 +422,19 @@ class ThreadPool : public std::enable_shared_from_this<ThreadPool>
GUL_EXPORT
std::vector<std::string> get_running_task_names() const;

/**
* Return the thread pool ID of the current thread.
*
* \returns a thread ID in the range [0, count_threads()).
*
* \exception std::runtime_error is thrown if this function is called from a thread
* that is not part of the pool.
*
* \since GUL version 2.13
*/
GUL_EXPORT
ThreadId get_thread_id() const;

/// Determine whether the queue for pending tasks is full (at capacity).
GUL_EXPORT
bool is_full() const noexcept;
Expand Down Expand Up @@ -513,6 +530,13 @@ class ThreadPool : public std::enable_shared_from_this<ThreadPool>
*/
std::vector<std::thread> threads_;

/**
* For worker threads, this is the index of the thread in the threads_ vector.
* For other threads, the value is meaningless and the variable is initialized to
* numeric_limits<ThreadId>::max().
*/
thread_local static ThreadId thread_id_;

/**
* A condition variable used together with mutex_ to wake up a worker thread when a
* new task is added (or when shutdown is requested).
Expand Down Expand Up @@ -583,8 +607,10 @@ class ThreadPool : public std::enable_shared_from_this<ThreadPool>
/**
* The main loop run in the thread; picks one task off the queue and executes it, then
* repeats until asked to quit.
*
* \param thread_index Index of the thread in the threads_ vector
*/
void perform_work();
void perform_work(std::size_t thread_index);
};

/**
Expand Down
2 changes: 1 addition & 1 deletion meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ project('gul14', 'cpp',
'cpp_std=c++14',
'warning_level=3',
],
version : '2.12',
version : '2.13',
meson_version : '>=0.49')

# Enforce that the version number is according to specs
Expand Down
57 changes: 36 additions & 21 deletions src/ThreadPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* \date Created on November 6, 2018
* \brief Implementation of the ThreadPool class.
*
* \copyright Copyright 2018-2024 Deutsches Elektronen-Synchrotron (DESY), Hamburg
* \copyright Copyright 2018-2025 Deutsches Elektronen-Synchrotron (DESY), Hamburg
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
Expand All @@ -21,6 +21,7 @@
*/

#include <algorithm>
#include <limits>

#include <gul14/cat.h>
#include <gul14/ThreadPool.h>
Expand Down Expand Up @@ -61,8 +62,8 @@ ThreadPool::ThreadPool(std::size_t num_threads, std::size_t capacity)
throw std::invalid_argument(cat("Illegal capacity for thread pool: ", capacity));

threads_.reserve(num_threads);
while (threads_.size() < num_threads)
threads_.emplace_back([this]() { perform_work(); });
for (std::size_t i = 0; i != num_threads; ++i)
threads_.emplace_back([this, i]() { perform_work(i); });
}

ThreadPool::~ThreadPool()
Expand Down Expand Up @@ -134,23 +135,6 @@ std::vector<std::string> ThreadPool::get_running_task_names() const
return running_task_names_;
}

bool ThreadPool::is_full() const noexcept
{
std::lock_guard<std::mutex> lock(mutex_);
return is_full_i();
}

bool ThreadPool::is_full_i() const noexcept
{
return pending_tasks_.size() >= capacity_;
}

bool ThreadPool::is_idle() const
{
std::lock_guard<std::mutex> lock(mutex_);
return pending_tasks_.empty() && running_task_ids_.empty();
}

ThreadPool::InternalTaskState ThreadPool::get_task_state(const TaskId task_id) const
{
std::lock_guard<std::mutex> lock(mutex_);
Expand All @@ -169,6 +153,31 @@ ThreadPool::InternalTaskState ThreadPool::get_task_state(const TaskId task_id) c
return InternalTaskState::unknown;
}

ThreadPool::ThreadId ThreadPool::get_thread_id() const
{
if (thread_id_ == std::numeric_limits<ThreadId>::max())
throw std::runtime_error("This thread is not part of the pool");

return thread_id_;
}

bool ThreadPool::is_full() const noexcept
{
std::lock_guard<std::mutex> lock(mutex_);
return is_full_i();
}

bool ThreadPool::is_full_i() const noexcept
{
return pending_tasks_.size() >= capacity_;
}

bool ThreadPool::is_idle() const
{
std::lock_guard<std::mutex> lock(mutex_);
return pending_tasks_.empty() && running_task_ids_.empty();
}

bool ThreadPool::is_shutdown_requested() const
{
std::lock_guard<std::mutex> lock(mutex_);
Expand All @@ -182,7 +191,7 @@ std::shared_ptr<ThreadPool> ThreadPool::make_shared(
return std::shared_ptr<ThreadPool>(new ThreadPool(num_threads, capacity));
}

void ThreadPool::perform_work()
void ThreadPool::perform_work(const ThreadPool::ThreadId thread_id)
{
#if defined(__APPLE__) || defined(__GNUC__)
// On unixoid systems, we block a number of signals in the worker threads because we
Expand All @@ -203,6 +212,9 @@ void ThreadPool::perform_work()
pthread_sigmask(SIG_BLOCK, &mask, 0);
#endif

// Assign thread-local thread ID
thread_id_ = thread_id;

std::unique_lock<std::mutex> lock(mutex_);

while (!shutdown_requested_)
Expand Down Expand Up @@ -264,4 +276,7 @@ void ThreadPool::perform_work()
}
}

thread_local ThreadPool::ThreadId
ThreadPool::thread_id_{ std::numeric_limits<ThreadPool::ThreadId>::max() };

} // namespace gul14
35 changes: 34 additions & 1 deletion tests/test_ThreadPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* \date Created on March 17, 2023
* \brief Declaration of the ThreadPool class.
*
* \copyright Copyright 2023-2024 Deutsches Elektronen-Synchrotron (DESY), Hamburg
* \copyright Copyright 2023-2025 Deutsches Elektronen-Synchrotron (DESY), Hamburg
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
Expand All @@ -22,12 +22,14 @@

// SPDX-License-Identifier: LGPL-2.1-or-later

#include <array>
#include <atomic>
#include <stdexcept>

#include "gul14/catch.h"
#include "gul14/ThreadPool.h"
#include "gul14/time_util.h"
#include "gul14/Trigger.h"

using namespace gul14;
using namespace std::literals;
Expand Down Expand Up @@ -450,6 +452,37 @@ TEST_CASE("ThreadPool: get_running_task_names()", "[ThreadPool]")
pool.reset();
}

TEST_CASE("ThreadPool: get_thread_id()", "[ThreadPool]")
{
std::array<std::atomic<ThreadPool::ThreadId>, 2> indices;
Trigger trigger;

auto pool = make_thread_pool(2);

pool->add_task(
[&](ThreadPool& p) { trigger.wait(); indices[0] = p.get_thread_id(); });
pool->add_task(
[&](ThreadPool& p) { trigger.wait(); indices[1] = p.get_thread_id(); });

while (pool->count_pending() > 0)
gul14::sleep(1ms);

trigger = true;

// From a non-worker thread, the function must throw.
REQUIRE_THROWS_AS(pool->get_thread_id(), std::runtime_error);

pool.reset();

CAPTURE(indices);
REQUIRE(indices[0] <= 1);
REQUIRE(indices[1] <= 1);
if (indices[0] == 0)
REQUIRE(indices[1] == 1);
if (indices[0] == 1)
REQUIRE(indices[1] == 0);
}

TEST_CASE("ThreadPool: is_full()", "[ThreadPool]")
{
auto pool = make_thread_pool(1);
Expand Down