Skip to content

Commit 30366d7

Browse files
author
Chee Bin Hoh
committed
improve pub_sub
1 parent d2ed5d5 commit 30366d7

1 file changed

Lines changed: 15 additions & 28 deletions

File tree

include/dmn-pub-sub.hpp

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
* Copyright © 2025 Chee Bin HOH. All rights reserved.
33
*
44
* @file dmn-pub-sub.hpp
5-
* @brief Lightweight publish/subscribe helpers built on top of Dmn_Async.
5+
* @brief Lightweight publish/subscribe helpers adaptor class as wrapper.
66
*
77
* Overview
88
* --------
9-
* - This header provides a small, efficient publish/subscribe utility:
9+
* - This header provides a small, efficient publish/subscribe adaptar classes:
1010
* * Dmn_Pub<T> publishes items of type T.
1111
* * Dmn_Pub<T>::Dmn_Sub is the subscriber interface that receives items.
1212
*
@@ -15,30 +15,29 @@
1515
* - Adapter - it allows other subclasses to be adapted as publishers or
1616
* subscribers.
1717
* - Observer - it defines one-to-many dependencies between objects so that
18-
* when one object publishes a state change, all dependent
19-
* subscribers are notified.
18+
* when one object publishes a message, all dependent subscribers
19+
* are notified.
2020
*
2121
* Key design goals
2222
* ----------------
2323
* - Simplicity: minimal API to publish, register and unregister subscribers.
2424
* - Correctness: clear ownership and lifetime semantics, safe cleanup on
2525
* destruction.
2626
* - Concurrency: deliveries occur asynchronously and subscribers process
27-
* notifications
27+
* notifications.
2828
*
2929
* Threading and execution model
3030
* -----------------------------
31-
* - Both Dmn_Pub and Dmn_Sub derive from Dmn_Async. Each Dmn_Sub object has its
31+
* - The Dmn_Pub is derivade from Dmn_Async. Each Dmn_Pub object has its
3232
* own singleton asynchronous execution context as provided by Dmn_Async.
3333
* - publish(const T&) schedules a delivery task in the publisher's async
3434
* context. That task (publishInternal) performs buffering and schedules per-
35-
* subscriber deliveries into each subscriber's async context via
36-
* Dmn_Sub::notifyInternal.
35+
* subscriber deliveries into each subscriber's publish() method.
3736
* - notify(const T&) is a pure virtual callback implemented by subscriber
3837
* subclasses and is always invoked directly in async publisher context, and
3938
* it is up to subscriber to decide how it adapted notify() method being
40-
* called, note that subscriber is adviced to process the notification in
41-
* its context.
39+
* called, note that subscriber is adviced to copy the notification to
40+
* its context before perform expensive computation.
4241
*
4342
* Synchronization
4443
* ---------------
@@ -67,6 +66,8 @@
6766
* registered) and waits for any pending asynchronous tasks to finish so a
6867
* destroyed subscriber will not receive further notifications.
6968
* - Dmn_Pub destructor: unregister all the subscribers still dangling.
69+
* - The Dmn_Pub and Dmn_Sub does not maintain object lifetime and ownership,
70+
* these are left it to the client of the classes.
7071
*
7172
* Error handling and exception safety
7273
* -----------------------------------
@@ -148,15 +149,6 @@ class Dmn_Pub : public Dmn_Async<QueueType> {
148149
friend class Dmn_Pub;
149150

150151
private:
151-
/**
152-
* @brief Internal helper called by Dmn_Pub to schedule delivery into the
153-
* subscriber's asynchronous context. Do not call from user code; implement
154-
* notify() instead.
155-
*
156-
* @param item The data item to deliver.
157-
*/
158-
void notifyInternal(const T &item);
159-
160152
ssize_t m_replayQuantity{
161153
-1}; // -1 resend all, 0 no resend, resend up to number
162154
Dmn_Pub *m_pub{};
@@ -209,7 +201,7 @@ class Dmn_Pub : public Dmn_Async<QueueType> {
209201
*
210202
* Register a subscriber of class interface/subclass from Dmn_Sub with
211203
* this publisher. After registration, items in the publisher's buffer are
212-
* replayed to the subscriber (via notifyInternal).
204+
* replayed to the subscriber.
213205
*
214206
* The immediate semantics (synchronous registration) allow callers to rely
215207
* on the subscriber being registered when the call returns.
@@ -223,7 +215,7 @@ class Dmn_Pub : public Dmn_Async<QueueType> {
223215
/**
224216
* @brief Register a subscriber of class interface/subclass from Dmn_Sub with
225217
* this publisher. After registration, items in the publisher's buffer are
226-
* replayed to the subscriber (via notifyInternal).
218+
* replayed to the subscriber.
227219
*
228220
* The immediate semantics (synchronous registration) allow callers to rely
229221
* on the subscriber being registered when the call returns.
@@ -275,11 +267,6 @@ Dmn_Pub<T, QueueType>::Dmn_Sub::~Dmn_Sub() noexcept try {
275267
return;
276268
}
277269

278-
template <typename T, template <class> class QueueType>
279-
void Dmn_Pub<T, QueueType>::Dmn_Sub::notifyInternal(const T &item) {
280-
this->notify(item);
281-
}
282-
283270
// class Dmn_Pub
284271
template <typename T, template <class> class QueueType>
285272
Dmn_Pub<T, QueueType>::Dmn_Pub(std::string_view name, size_t capacity,
@@ -347,7 +334,7 @@ void Dmn_Pub<T, QueueType>::publishInternal(const T &item) {
347334

348335
for (auto &sub : m_subscribers) {
349336
if (!m_filter_fn || m_filter_fn(sub.get(), item)) {
350-
sub->notifyInternal(item);
337+
sub->notify(item);
351338
}
352339
}
353340
} // method publishInternal()
@@ -378,7 +365,7 @@ void Dmn_Pub<T, QueueType>::registerSubscriber(std::shared_ptr<Dmn_Sub> sub) {
378365
if (numberOfItemsToBeSkipped > 0) {
379366
numberOfItemsToBeSkipped--;
380367
} else {
381-
sub->notifyInternal(item);
368+
sub->notify(item);
382369
}
383370
}
384371
});

0 commit comments

Comments
 (0)