diff --git a/include/dmn-blockingqueue-lf.hpp b/include/dmn-blockingqueue-lf.hpp index 65c6356..76aec1c 100644 --- a/include/dmn-blockingqueue-lf.hpp +++ b/include/dmn-blockingqueue-lf.hpp @@ -247,6 +247,7 @@ class Dmn_BlockingQueue_Lf * queue. * * @param item The item to be enqueued. + * * @throws std::runtime_error if the queue is shutting down when the push * operation is attempted. */ @@ -257,6 +258,7 @@ class Dmn_BlockingQueue_Lf * queue. * * @param item The item to be enqueued. + * * @throws std::runtime_error if the queue is shutting down when the push * operation is attempted. */ @@ -299,7 +301,7 @@ class Dmn_BlockingQueue_Lf */ template Node::*NextField> static auto freeNodeChain(Node *head) -> uint64_t { - uint64_t cnt = 0; + uint64_t count = 0; while (head != nullptr) { Node *next = (head->*NextField).load(std::memory_order_relaxed); @@ -307,10 +309,10 @@ class Dmn_BlockingQueue_Lf delete head; head = next; - ++cnt; + ++count; } - return cnt; + return count; } /** @@ -533,7 +535,7 @@ template void Dmn_BlockingQueue_Lf::pushMove(T &&item) { DMN_PROC_CLEANUP_PUSH(&Dmn_BlockingQueue_Lf::cleanup_thunk_inflight, &inflightTicket); - pushImpl(std::move_if_noexcept(item)); // move if noexcept, else copy + pushImpl(std::move(item)); // move DMN_PROC_CLEANUP_POP(0); } @@ -679,14 +681,15 @@ auto Dmn_BlockingQueue_Lf::enterInflightGuardFnc() -> uint64_t { if (0 == count) { auto ptr = m_epochReclaimNode[index].load(std::memory_order_acquire); - if (nullptr != ptr && - !m_epochReclaimNode[index].compare_exchange_strong( - ptr, nullptr, std::memory_order_release, - std::memory_order_acquire)) { - continue; + if (nullptr != ptr) { + if (!m_epochReclaimNode[index].compare_exchange_strong( + ptr, nullptr, std::memory_order_release, + std::memory_order_acquire)) { + continue; + } + + freeRetiredNodeList(ptr); } - - freeRetiredNodeList(ptr); } } @@ -729,14 +732,15 @@ void Dmn_BlockingQueue_Lf::leaveInflightGuardFnc( do { auto ptr = m_epochReclaimNode[epochIndex].load(std::memory_order_acquire); - if (nullptr != ptr && - !m_epochReclaimNode[epochIndex].compare_exchange_strong( - ptr, nullptr, std::memory_order_release, - std::memory_order_acquire)) { - continue; - } + if (nullptr != ptr) { + if (!m_epochReclaimNode[epochIndex].compare_exchange_strong( + ptr, nullptr, std::memory_order_release, + std::memory_order_acquire)) { + continue; + } - freeRetiredNodeList(ptr); + freeRetiredNodeList(ptr); + } break; } while (true); diff --git a/include/dmn-blockingqueue-mt.hpp b/include/dmn-blockingqueue-mt.hpp index 7cf103f..690d8bf 100644 --- a/include/dmn-blockingqueue-mt.hpp +++ b/include/dmn-blockingqueue-mt.hpp @@ -238,9 +238,6 @@ Dmn_BlockingQueue_Mt::~Dmn_BlockingQueue_Mt() noexcept try { if (!isShutdown()) { shutdown(); } - - // make sure that all api call within the inflight guard exits. - this->waitForEmptyInflight(); } catch (...) { // Destructors must be noexcept: swallow exceptions. return; @@ -300,10 +297,15 @@ void Dmn_BlockingQueue_Mt::pushImpl(U &&item) { } template void Dmn_BlockingQueue_Mt::shutdown() { + // 1. set shutdown flag Dmn_BlockingQueue, T>::shutdown(); + // 2. notify all the threads blocking waiting m_empty_cond.notify_all(); m_not_empty_cond.notify_all(); + + // 3. wait for all threads that already "entered the epoch" to leave + this->waitForEmptyInflight(); } template auto Dmn_BlockingQueue_Mt::waitForEmpty() -> uint64_t { diff --git a/include/dmn-blockingqueue.hpp b/include/dmn-blockingqueue.hpp index 0423c1f..3606d4d 100644 --- a/include/dmn-blockingqueue.hpp +++ b/include/dmn-blockingqueue.hpp @@ -109,12 +109,10 @@ template class Dmn_BlockingQueue { virtual void push(const T &item) final; /** - * @brief Enqueue an rvalue item into the tail of the queue, preferring move + * @brief Enqueue an rvalue item into the tail of the queue via move * semantics. * - * @param item The rvalue item to be enqueued. Implementations may - * internally fall back to copying (e.g., when using std::move_if_noexcept for - * types with throwing move constructors). + * @param item The rvalue item to be enqueued. */ virtual void push(T &&item) final; diff --git a/include/dmn-inflight-guard.hpp b/include/dmn-inflight-guard.hpp index 803c0fa..d282804 100644 --- a/include/dmn-inflight-guard.hpp +++ b/include/dmn-inflight-guard.hpp @@ -96,7 +96,7 @@ * - Override isInflightGuardClosed() to reflect shutdown state. * - Optionally override enterInflightGuardFnc()/leaveInflightGuardFnc() if * per-call bookkeeping is required (e.g. epoch-based reclamation). - * - Public APIs should acquire a ticket early (typically at the start of + * - Public APIs should acquire a ticket early (e.g. at the start of * push/pop) and keep it alive until the function returns. * - shutdown()/destructor should close the inflight guard first, then call * waitForEmptyInflight() before freeing shared resources that in-flight calls @@ -214,7 +214,7 @@ template class Dmn_Inflight_Guard { virtual void leaveInflightGuardFnc(const T &) noexcept {} - auto inflight_count() const -> uint64_t { + virtual auto inflight_count() const -> uint64_t final { return m_inflight_count.load(std::memory_order_acquire); }