Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 22 additions & 18 deletions include/dmn-blockingqueue-lf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ class Dmn_BlockingQueue_Lf
* queue.
*
* @param item The item to be enqueued.
*
* @throws std::runtime_error if the queue is shutting down when the push
* operation is attempted.
*/
Expand All @@ -257,6 +258,7 @@ class Dmn_BlockingQueue_Lf
* queue.
*
* @param item The item to be enqueued.
*
* @throws std::runtime_error if the queue is shutting down when the push
* operation is attempted.
*/
Expand Down Expand Up @@ -299,18 +301,18 @@ class Dmn_BlockingQueue_Lf
*/
template <std::atomic<Node *> Node::*NextField>
static auto freeNodeChain(Node *head) -> uint64_t {
uint64_t cnt = 0;
uint64_t count = 0;

while (head != nullptr) {
Node *next = (head->*NextField).load(std::memory_order_relaxed);

delete head;

head = next;
++cnt;
++count;
}

return cnt;
return count;
}

/**
Expand Down Expand Up @@ -533,7 +535,7 @@ template <typename T> void Dmn_BlockingQueue_Lf<T>::pushMove(T &&item) {
DMN_PROC_CLEANUP_PUSH(&Dmn_BlockingQueue_Lf<T>::cleanup_thunk_inflight,
&inflightTicket);

pushImpl(std::move_if_noexcept(item)); // move if noexcept, else copy
pushImpl(std::move(item)); // move

DMN_PROC_CLEANUP_POP(0);
}
Expand Down Expand Up @@ -679,14 +681,15 @@ auto Dmn_BlockingQueue_Lf<T>::enterInflightGuardFnc() -> uint64_t {
if (0 == count) {
auto ptr =
m_epochReclaimNode[index].load(std::memory_order_acquire);
if (nullptr != ptr &&
!m_epochReclaimNode[index].compare_exchange_strong(
ptr, nullptr, std::memory_order_release,
std::memory_order_acquire)) {
continue;
if (nullptr != ptr) {
if (!m_epochReclaimNode[index].compare_exchange_strong(
ptr, nullptr, std::memory_order_release,
std::memory_order_acquire)) {
continue;
}

freeRetiredNodeList(ptr);
}

freeRetiredNodeList(ptr);
}
}

Expand Down Expand Up @@ -729,14 +732,15 @@ void Dmn_BlockingQueue_Lf<T>::leaveInflightGuardFnc(
do {
auto ptr =
m_epochReclaimNode[epochIndex].load(std::memory_order_acquire);
if (nullptr != ptr &&
!m_epochReclaimNode[epochIndex].compare_exchange_strong(
ptr, nullptr, std::memory_order_release,
std::memory_order_acquire)) {
continue;
}
if (nullptr != ptr) {
if (!m_epochReclaimNode[epochIndex].compare_exchange_strong(
ptr, nullptr, std::memory_order_release,
std::memory_order_acquire)) {
continue;
}

freeRetiredNodeList(ptr);
freeRetiredNodeList(ptr);
}

break;
} while (true);
Expand Down
8 changes: 5 additions & 3 deletions include/dmn-blockingqueue-mt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,6 @@ Dmn_BlockingQueue_Mt<T>::~Dmn_BlockingQueue_Mt() noexcept try {
if (!isShutdown()) {
shutdown();
}

// make sure that all api call within the inflight guard exits.
this->waitForEmptyInflight();
} catch (...) {
// Destructors must be noexcept: swallow exceptions.
return;
Expand Down Expand Up @@ -300,10 +297,15 @@ void Dmn_BlockingQueue_Mt<T>::pushImpl(U &&item) {
}

template <typename T> void Dmn_BlockingQueue_Mt<T>::shutdown() {
// 1. set shutdown flag
Dmn_BlockingQueue<Dmn_BlockingQueue_Mt<T>, T>::shutdown();

// 2. notify all the threads blocking waiting
m_empty_cond.notify_all();
m_not_empty_cond.notify_all();

// 3. wait for all threads that already "entered the epoch" to leave
this->waitForEmptyInflight();
}

template <typename T> auto Dmn_BlockingQueue_Mt<T>::waitForEmpty() -> uint64_t {
Expand Down
6 changes: 2 additions & 4 deletions include/dmn-blockingqueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,10 @@ template <typename Derived, typename T> class Dmn_BlockingQueue {
virtual void push(const T &item) final;

/**
* @brief Enqueue an rvalue item into the tail of the queue, preferring move
* @brief Enqueue an rvalue item into the tail of the queue via move
* semantics.
*
* @param item The rvalue item to be enqueued. Implementations may
* internally fall back to copying (e.g., when using std::move_if_noexcept for
* types with throwing move constructors).
* @param item The rvalue item to be enqueued.
*/
virtual void push(T &&item) final;

Expand Down
4 changes: 2 additions & 2 deletions include/dmn-inflight-guard.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
* - Override isInflightGuardClosed() to reflect shutdown state.
* - Optionally override enterInflightGuardFnc()/leaveInflightGuardFnc() if
* per-call bookkeeping is required (e.g. epoch-based reclamation).
* - Public APIs should acquire a ticket early (typically at the start of
* - Public APIs should acquire a ticket early (e.g. at the start of
* push/pop) and keep it alive until the function returns.
* - shutdown()/destructor should close the inflight guard first, then call
* waitForEmptyInflight() before freeing shared resources that in-flight calls
Expand Down Expand Up @@ -214,7 +214,7 @@ template <class T = std::monostate> class Dmn_Inflight_Guard {

virtual void leaveInflightGuardFnc(const T &) noexcept {}

auto inflight_count() const -> uint64_t {
virtual auto inflight_count() const -> uint64_t final {
return m_inflight_count.load(std::memory_order_acquire);
}

Expand Down
Loading