Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 15 additions & 28 deletions include/dmn-pub-sub.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
* Copyright © 2025 Chee Bin HOH. All rights reserved.
*
* @file dmn-pub-sub.hpp
* @brief Lightweight publish/subscribe helpers built on top of Dmn_Async.
* @brief Lightweight publish/subscribe helpers adaptor class as wrapper.
*
* Overview
* --------
* - This header provides a small, efficient publish/subscribe utility:
* - This header provides a small, efficient publish/subscribe adaptar classes:
* * Dmn_Pub<T> publishes items of type T.
* * Dmn_Pub<T>::Dmn_Sub is the subscriber interface that receives items.
*
Expand All @@ -15,30 +15,29 @@
* - Adapter - it allows other subclasses to be adapted as publishers or
* subscribers.
* - Observer - it defines one-to-many dependencies between objects so that
* when one object publishes a state change, all dependent
* subscribers are notified.
* when one object publishes a message, all dependent subscribers
* are notified.
*
* Key design goals
* ----------------
* - Simplicity: minimal API to publish, register and unregister subscribers.
* - Correctness: clear ownership and lifetime semantics, safe cleanup on
* destruction.
* - Concurrency: deliveries occur asynchronously and subscribers process
* notifications
* notifications.
*
* Threading and execution model
* -----------------------------
* - Both Dmn_Pub and Dmn_Sub derive from Dmn_Async. Each Dmn_Sub object has its
* - The Dmn_Pub is derivade from Dmn_Async. Each Dmn_Pub object has its
* own singleton asynchronous execution context as provided by Dmn_Async.
* - publish(const T&) schedules a delivery task in the publisher's async
* context. That task (publishInternal) performs buffering and schedules per-
* subscriber deliveries into each subscriber's async context via
* Dmn_Sub::notifyInternal.
* subscriber deliveries into each subscriber's publish() method.
* - notify(const T&) is a pure virtual callback implemented by subscriber
* subclasses and is always invoked directly in async publisher context, and
* it is up to subscriber to decide how it adapted notify() method being
* called, note that subscriber is adviced to process the notification in
* its context.
* called, note that subscriber is adviced to copy the notification to
* its context before perform expensive computation.
*
* Synchronization
* ---------------
Expand Down Expand Up @@ -67,6 +66,8 @@
* registered) and waits for any pending asynchronous tasks to finish so a
* destroyed subscriber will not receive further notifications.
* - Dmn_Pub destructor: unregister all the subscribers still dangling.
* - The Dmn_Pub and Dmn_Sub does not maintain object lifetime and ownership,
* these are left it to the client of the classes.
*
* Error handling and exception safety
* -----------------------------------
Expand Down Expand Up @@ -148,15 +149,6 @@ class Dmn_Pub : public Dmn_Async<QueueType> {
friend class Dmn_Pub;

private:
/**
* @brief Internal helper called by Dmn_Pub to schedule delivery into the
* subscriber's asynchronous context. Do not call from user code; implement
* notify() instead.
*
* @param item The data item to deliver.
*/
void notifyInternal(const T &item);

ssize_t m_replayQuantity{
-1}; // -1 resend all, 0 no resend, resend up to number
Dmn_Pub *m_pub{};
Expand Down Expand Up @@ -209,7 +201,7 @@ class Dmn_Pub : public Dmn_Async<QueueType> {
*
* Register a subscriber of class interface/subclass from Dmn_Sub with
* this publisher. After registration, items in the publisher's buffer are
* replayed to the subscriber (via notifyInternal).
* replayed to the subscriber.
*
* The immediate semantics (synchronous registration) allow callers to rely
* on the subscriber being registered when the call returns.
Expand All @@ -223,7 +215,7 @@ class Dmn_Pub : public Dmn_Async<QueueType> {
/**
* @brief Register a subscriber of class interface/subclass from Dmn_Sub with
* this publisher. After registration, items in the publisher's buffer are
* replayed to the subscriber (via notifyInternal).
* replayed to the subscriber.
*
* The immediate semantics (synchronous registration) allow callers to rely
* on the subscriber being registered when the call returns.
Expand Down Expand Up @@ -275,11 +267,6 @@ Dmn_Pub<T, QueueType>::Dmn_Sub::~Dmn_Sub() noexcept try {
return;
}

template <typename T, template <class> class QueueType>
void Dmn_Pub<T, QueueType>::Dmn_Sub::notifyInternal(const T &item) {
this->notify(item);
}

// class Dmn_Pub
template <typename T, template <class> class QueueType>
Dmn_Pub<T, QueueType>::Dmn_Pub(std::string_view name, size_t capacity,
Expand Down Expand Up @@ -347,7 +334,7 @@ void Dmn_Pub<T, QueueType>::publishInternal(const T &item) {

for (auto &sub : m_subscribers) {
if (!m_filter_fn || m_filter_fn(sub.get(), item)) {
sub->notifyInternal(item);
sub->notify(item);
}
}
} // method publishInternal()
Expand Down Expand Up @@ -378,7 +365,7 @@ void Dmn_Pub<T, QueueType>::registerSubscriber(std::shared_ptr<Dmn_Sub> sub) {
if (numberOfItemsToBeSkipped > 0) {
numberOfItemsToBeSkipped--;
} else {
sub->notifyInternal(item);
sub->notify(item);
}
}
});
Expand Down
Loading