diff --git a/plans/20260401_01_batch_queue_associations.md b/plans/20260401_01_batch_queue_associations.md new file mode 100644 index 0000000000..07f794df2f --- /dev/null +++ b/plans/20260401_01_batch_queue_associations.md @@ -0,0 +1,126 @@ +# Server: batch queue service associations + +When a batch of SUB or NSUB commands arrives from a service client, each command that needs a new or removed service association calls `setQueueService` individually - one DB write per command. For 135 commands per batch, that's 135 individual `UPDATE msg_queues` queries. + +## Goal + +Reduce to at most 2 DB queries per batch (one for rcv associations, one for ntf associations), using `UPDATE ... RETURNING recipient_id` to identify which queues were actually updated. + +Also fuse message pre-fetch and association batching into a single batch preparation step with a clean contract. + +## Contract + +```haskell +prepareBatch :: Maybe ServiceId -> NonEmpty (VerifiedTransmission s) -> M s (Either ErrorType (Map RecipientId (Maybe Message, Maybe (Either ErrorType ())))) +``` + +`Left e` = batch-level failure (message pre-fetch or association query failed entirely). All SUBs/NSUBs in the batch get this error. + +`Right map` = per-queue results as a tuple: +- `Maybe Message` - pre-fetched message for SUB queues, `Nothing` for NSUB or no message +- `Maybe (Either ErrorType ())` - association result. `Nothing` = no update needed. `Just (Right ())` = update succeeded. `Just (Left e)` = update failed for this queue. + +One map, one lookup per queue. `processCommand` passes both values to `subscribeQueueAndDeliver` / `subscribeNotifications` -> `sharedSubscribeQueue`. + +Queues not in the map (non-SUB/NSUB commands, failed verification) are not affected. + +## prepareBatch implementation + +One accumulating fold over the batch, collecting three lists: +- `subMsgQs :: [StoreQueue s]` - SUB queues for message pre-fetch +- `rcvAssocQs :: [StoreQueue s]` - SUB queues needing `rcv_service_id` update (`clntServiceId /= rcvServiceId qr`) +- `ntfAssocQs :: [StoreQueue s]` - NSUB queues needing `ntf_service_id` update (`clntServiceId /= ntfServiceId` from `NtfCreds`) + +Classification reads from the already-loaded `QueueRec` in `VerifiedTransmission` - no extra DB query. + +Then three store calls (each skipped if its list is empty): +1. `tryPeekMsgs ms subMsgQs` -> `Map RecipientId Message` +2. `setRcvQueueServices (queueStore ms) clntServiceId rcvAssocQs` -> `Set RecipientId` +3. `setNtfQueueServices (queueStore ms) clntServiceId ntfAssocQs` -> `Set RecipientId` + +Then one pass to merge results into `Map RecipientId (Maybe Message, Maybe (Either ErrorType ()))`: +- For each SUB queue: `(M.lookup rId msgMap, assocResult rId rcvUpdated rcvAssocQs)` +- For each NSUB queue: `(Nothing, assocResult rId ntfUpdated ntfAssocQs)` + +Where `assocResult rId updated assocQs` = if the queue was in `assocQs` (needed update), then `Just (Right ())` if `rId` is in `updated`, else `Just (Left AUTH)`. If not in `assocQs` (no update needed), `Nothing`. + +If any of the three calls fails entirely, return `Left e`. + +## Store interface + +Replace the polymorphic `setQueueServices` with two plain functions in `QueueStoreClass`: + +```haskell +setRcvQueueServices :: s -> Maybe ServiceId -> [q] -> IO (Set RecipientId) +setNtfQueueServices :: s -> Maybe ServiceId -> [q] -> IO (Set RecipientId) +``` + +No `SParty p` polymorphism. Each function knows its column. + +### Postgres implementation + +`setRcvQueueServices`: +```sql +UPDATE msg_queues SET rcv_service_id = ? +WHERE recipient_id IN ? AND deleted_at IS NULL +RETURNING recipient_id +``` + +`setNtfQueueServices`: +```sql +UPDATE msg_queues SET ntf_service_id = ? +WHERE recipient_id IN ? AND notifier_id IS NOT NULL AND deleted_at IS NULL +RETURNING recipient_id +``` + +After each batch query, for each queue in the returned set: +1. Read QueueRec TVar, update with new serviceId +2. Write store log entry + +### STM implementation + +Loop over queues, call existing per-item logic, collect succeeded `RecipientId`s into a Set. + +## Downstream changes in Server.hs + +### processCommand + +Gains one parameter: `Map RecipientId (Maybe Message, Maybe (Either ErrorType ()))`. + +SUB case: `M.lookup entId prepared` gives `Just (msg_, assocResult)` or `Nothing`. Pass both to `subscribeQueueAndDeliver`. + +NSUB case: `M.lookup entId prepared` gives `Just (Nothing, assocResult)` or `Nothing`. Pass `assocResult` to `subscribeNotifications`. + +Forwarded commands: pass `M.empty`. + +### subscribeQueueAndDeliver + +Takes `Maybe Message` and `Maybe (Either ErrorType ())` as before. No change in how it uses them. + +### sharedSubscribeQueue + +Takes `Maybe (Either ErrorType ())`. On paths needing association update: +- `Just (Left e)` -> return error +- `Just (Right ())` -> skip `setQueueService`, proceed with STM work +- `Nothing` -> no update needed, proceed with existing logic + +## Implementation order (top-down) + +1. Define the `prepareBatch` contract and thread one map through `processCommand` -> `subscribeQueueAndDeliver` / `subscribeNotifications` -> `sharedSubscribeQueue` (Server.hs) +2. Implement `prepareBatch` with the fold, three calls, and merge (Server.hs) +3. Add `setRcvQueueServices` and `setNtfQueueServices` to `QueueStoreClass` (Types.hs) +4. Implement for Postgres with batch `UPDATE ... RETURNING` (Postgres.hs) +5. Implement for STM as loop (STM.hs) +6. Implement for Journal as delegation (Journal.hs) + +At step 2, store functions can initially be stubs returning empty sets. Steps 3-6 fill in the real implementations. + +## Files changed + +| File | Change | +|---|---| +| `src/Simplex/Messaging/Server.hs` | `prepareBatch` with fold + merge; one map parameter through `processCommand` -> `subscribeQueueAndDeliver` / `subscribeNotifications` -> `sharedSubscribeQueue` | +| `src/Simplex/Messaging/Server/QueueStore/Types.hs` | Add `setRcvQueueServices`, `setNtfQueueServices` to `QueueStoreClass` | +| `src/Simplex/Messaging/Server/QueueStore/Postgres.hs` | Implement with batch `UPDATE ... RETURNING` + per-item TVar/log updates | +| `src/Simplex/Messaging/Server/QueueStore/STM.hs` | Implement as loop | +| `src/Simplex/Messaging/Server/MsgStore/Journal.hs` | Delegate to underlying store | diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 66100e97d8..1b7d920ac5 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -1366,19 +1366,37 @@ client labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands" let THandleParams {thVersion} = thParams' clntServiceId = (\THClientService {serviceId} -> serviceId) <$> (peerClientService =<< thAuth thParams') - process msgMap t acc@(rs, msgs) = + process batchSubs t acc@(rs, msgs) = (maybe acc (\(!r, !msg_) -> (r : rs, maybe msgs (: msgs) msg_))) - <$> processCommand clntServiceId thVersion msgMap t + <$> processCommand clntServiceId thVersion batchSubs t forever $ do batch <- atomically (readTBQueue rcvQ) - msgMap <- prefetchMsgs batch - foldrM (process msgMap) ([], []) batch + batchSubs <- prepareBatchSubs clntServiceId batch + foldrM (process batchSubs) ([], []) batch >>= \(rs_, msgs) -> mapM_ (atomically . writeTBQueue sndQ . (,msgs)) (L.nonEmpty rs_) where - prefetchMsgs :: NonEmpty (VerifiedTransmission s) -> M s (Either ErrorType (Map RecipientId Message)) - prefetchMsgs batch = - let subQs = [q | (Just (q, _), (_, _, Cmd SRecipient SUB)) <- L.toList batch] - in if null subQs then pure $ Right M.empty else liftIO $ runExceptT $ tryPeekMsgs ms subQs + prepareBatchSubs :: + Maybe ServiceId -> + NonEmpty (VerifiedTransmission s) -> + M s (Either ErrorType (Map RecipientId Message, Map RecipientId (Either ErrorType ()), Map RecipientId (Either ErrorType ()))) + prepareBatchSubs clntServiceId_ batch = do + let (subMsgQs, rcvAssocQs, ntfAssocQs) = foldr partitionSubs ([], [], []) batch + partitionSubs t (msgQs, rcvQs, ntfQs) = case t of + (Just (q, qr), (_, _, Cmd SRecipient SUB)) + | clntServiceId_ /= rcvServiceId qr -> (q : msgQs, q : rcvQs, ntfQs) + | otherwise -> (q : msgQs, rcvQs, ntfQs) + (Just (q, qr), (_, _, Cmd SNotifier NSUB)) + | clntServiceId_ /= (notifier qr >>= ntfServiceId) -> (msgQs, rcvQs, q : ntfQs) + _ -> (msgQs, rcvQs, ntfQs) + liftIO $ runExceptT $ do + rcvAssocs <- ifNotNull rcvAssocQs $ setService SRecipientService clntServiceId_ + ntfAssocs <- ifNotNull ntfAssocQs $ setService SNotifierService clntServiceId_ + msgs <- ifNotNull subMsgQs $ tryPeekMsgs ms + pure (msgs, rcvAssocs, ntfAssocs) + where + ifNotNull qs f = if null qs then pure M.empty else f qs + setService :: (PartyI p, ServiceParty p) => SParty p -> Maybe ServiceId -> [StoreQueue s] -> ExceptT ErrorType IO (Map RecipientId (Either ErrorType ())) + setService party sId = ExceptT . setQueueServices (queueStore ms) party sId processProxiedCmd :: Transmission (Command 'ProxiedClient) -> M s (Maybe ResponseAndMessage) processProxiedCmd (corrId, EntityId sessId, command) = (\t -> ((corrId, EntityId sessId, t), Nothing)) <$$> case command of @@ -1460,8 +1478,8 @@ client mkIncProxyStats ps psOwn own sel = do incStat $ sel ps when own $ incStat $ sel psOwn - processCommand :: Maybe ServiceId -> VersionSMP -> Either ErrorType (Map RecipientId Message) -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage) - processCommand clntServiceId clntVersion msgMap (q_, (corrId, entId, cmd)) = case cmd of + processCommand :: Maybe ServiceId -> VersionSMP -> Either ErrorType (Map RecipientId Message, Map RecipientId (Either ErrorType ()), Map RecipientId (Either ErrorType ())) -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage) + processCommand clntServiceId clntVersion batchSubs (q_, (corrId, entId, cmd)) = case cmd of Cmd SProxiedClient command -> processProxiedCmd (corrId, entId, command) Cmd SSender command -> case command of SKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k @@ -1472,7 +1490,9 @@ client LKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k $>> getQueueLink_ q qr LGET -> withQueue $ \q qr -> checkContact qr $ getQueueLink_ q qr Cmd SNotifier NSUB -> response . (corrId,entId,) <$> case q_ of - Just (q, QueueRec {notifier = Just ntfCreds}) -> subscribeNotifications q ntfCreds + Just (q, QueueRec {notifier = Just ntfCreds}) -> + either (pure . ERR) (\_ -> subscribeNotifications q ntfCreds) + $ batchSubs >>= \(_, _, ntfAssocs) -> sequence (M.lookup (recipientId q) ntfAssocs) _ -> pure $ ERR INTERNAL Cmd SNotifierService (NSUBS n idsHash) -> response . (corrId,entId,) <$> case clntServiceId of Just serviceId -> subscribeServiceNotifications serviceId (n, idsHash) @@ -1485,9 +1505,9 @@ client pure $ allowNewQueues && maybe True ((== auth_) . Just) newQueueBasicAuth Cmd SRecipient command -> case command of - SUB -> case msgMap of + SUB -> case batchSubs >>= \(msgs, rcvAssocs, _) -> sequence (M.lookup entId rcvAssocs) $> msgs of Left e -> pure $ Just (err e, Nothing) - Right msgs -> withQueue' $ subscribeQueueAndDeliver (M.lookup entId msgs) + Right msgs -> withQueue' $ subscribeQueueAndDeliver $ M.lookup entId msgs GET -> withQueue getMessage ACK msgId -> withQueue $ acknowledgeMsg msgId KEY sKey -> withQueue $ \q _ -> either err (corrId,entId,) <$> secureQueue_ q sKey @@ -1632,9 +1652,7 @@ client subscribeQueueAndDeliver msg_ q qr@QueueRec {rcvServiceId} = liftIO (TM.lookupIO entId $ subscriptions clnt) >>= \case Nothing -> - sharedSubscribeQueue q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices >>= \case - Left e -> pure (err e, Nothing) - Right s -> deliver s + deliver =<< sharedSubscribeQueue q rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices Just s@Sub {subThread} -> do stats <- asks serverStats case subThread of @@ -1735,26 +1753,22 @@ client else liftIO (updateQueueTime (queueStore ms) q t) >>= either (pure . err') (action q) subscribeNotifications :: StoreQueue s -> NtfCreds -> M s BrokerMsg - subscribeNotifications q NtfCreds {ntfServiceId} = - sharedSubscribeQueue q SNotifierService ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure ()) ntfServices >>= \case - Left e -> pure $ ERR e - Right (hasSub, _) -> do - when (isNothing clntServiceId) $ - asks serverStats >>= incStat . (if hasSub then ntfSubDuplicate else ntfSub) - pure $ SOK clntServiceId + subscribeNotifications q NtfCreds {ntfServiceId} = do + (hasSub, _) <- sharedSubscribeQueue q ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure ()) ntfServices + when (isNothing clntServiceId) $ + asks serverStats >>= incStat . (if hasSub then ntfSubDuplicate else ntfSub) + pure $ SOK clntServiceId sharedSubscribeQueue :: - (PartyI p, ServiceParty p) => StoreQueue s -> - SParty p -> Maybe ServiceId -> ServerSubscribers s -> (Client s -> TMap QueueId sub) -> (Client s -> TVar (Int64, IdsHash)) -> STM sub -> (ServerStats -> ServiceStats) -> - M s (Either ErrorType (Bool, Maybe sub)) - sharedSubscribeQueue q party queueServiceId srvSubscribers clientSubs clientServiceSubs mkSub servicesSel = do + M s (Bool, Maybe sub) + sharedSubscribeQueue q queueServiceId srvSubscribers clientSubs clientServiceSubs mkSub servicesSel = do stats <- asks serverStats let incSrvStat sel = incStat $ sel $ servicesSel stats writeSub = writeTQueue (subQ srvSubscribers) (CSClient entId queueServiceId clntServiceId, clientId) @@ -1768,25 +1782,23 @@ client incSrvStat srvSubCount incSrvStat srvSubQueues incSrvStat srvAssocDuplicate - pure $ Right (hasSub, Nothing) - | otherwise -> runExceptT $ do - -- new or updated queue-service association - ExceptT $ setQueueService (queueStore ms) q party (Just serviceId) + pure (hasSub, Nothing) + | otherwise -> do + -- association already done in prepareBatchSubs hasSub <- atomically $ (<$ incServiceQueueSubs) =<< hasServiceSub atomically writeSub - liftIO $ do - unless hasSub $ incSrvStat srvSubCount - incSrvStat srvSubQueues - incSrvStat $ maybe srvAssocNew (const srvAssocUpdated) queueServiceId + unless hasSub $ incSrvStat srvSubCount + incSrvStat srvSubQueues + incSrvStat $ maybe srvAssocNew (const srvAssocUpdated) queueServiceId pure (hasSub, Nothing) where hasServiceSub = ((0 /=) . fst) <$> readTVar (clientServiceSubs clnt) -- This function is used when queue association with the service is created. - incServiceQueueSubs = modifyTVar' (clientServiceSubs clnt) $ addServiceSubs (1, queueIdHash (recipientId q)) -- service count and IDs hash + incServiceQueueSubs = modifyTVar' (clientServiceSubs clnt) $ addServiceSubs (1, queueIdHash (recipientId q)) -- service count and IDS hash Nothing -> case queueServiceId of - Just _ -> runExceptT $ do - ExceptT $ setQueueService (queueStore ms) q party Nothing - liftIO $ incSrvStat srvAssocRemoved + Just _ -> do + -- unassociation already done in prepareBatchSubs + incSrvStat srvAssocRemoved -- getSubscription may be Just for receiving service, where clientSubs also hold active deliveries for service subscriptions. -- For notification service it can only be Just if storage and session states diverge. r <- atomically $ getSubscription >>= newSub @@ -1795,7 +1807,7 @@ client Nothing -> do r@(hasSub, _) <- atomically $ getSubscription >>= newSub unless hasSub $ atomically writeSub - pure $ Right r + pure r where getSubscription = TM.lookup entId $ clientSubs clnt newSub = \case @@ -2094,7 +2106,7 @@ client -- rejectOrVerify filters allowed commands, no need to repeat it here. -- INTERNAL is used because processCommand never returns Nothing for sender commands (could be extracted for better types). -- `fst` removes empty message that is only returned for `SUB` command - Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion (Right M.empty) t'') + Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion (Right (M.empty, M.empty, M.empty)) t'') -- encode response r' <- case batchTransmissions clntTHParams [Right (Nothing, encodeTransmission clntTHParams r)] of [] -> throwE INTERNAL -- at least 1 item is guaranteed from NonEmpty/Right diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index c65660c93b..185c113b78 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -353,6 +353,8 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where {-# INLINE getCreateService #-} setQueueService = withQS setQueueService {-# INLINE setQueueService #-} + setQueueServices = withQS setQueueServices + {-# INLINE setQueueServices #-} getQueueNtfServices = withQS (getQueueNtfServices @(JournalQueue s)) {-# INLINE getQueueNtfServices #-} getServiceQueueCountHash = withQS (getServiceQueueCountHash @(JournalQueue s)) diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs index a8c8c040aa..ce1cf25db4 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs @@ -91,7 +91,7 @@ import Simplex.Messaging.SystemTime import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (SMPServiceRole (..)) -import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, maybeFirstRow', tshow, (<$$>)) +import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, maybeFirstRow', tshow, (<$$>), ($>>=)) import System.Exit (exitFailure) import System.IO (IOMode (..), hFlush, stdout) import UnliftIO.STM @@ -504,6 +504,32 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where atomically $ writeTVar (queueRec sq) $ Just q' withLog "setQueueService" st $ \sl -> logQueueService sl rId party serviceId + setQueueServices :: (PartyI p, ServiceParty p) => PostgresQueueStore q -> SParty p -> Maybe ServiceId -> [q] -> IO (Either ErrorType (M.Map RecipientId (Either ErrorType ()))) + setQueueServices _ _ _ [] = pure $ Right M.empty + setQueueServices st party serviceId qs = E.uninterruptibleMask_ $ runExceptT $ do + updated <- S.fromList <$> withDB' "setQueueServices" st (\db -> + map fromOnly <$> DB.query db updateQuery (serviceId, In (map recipientId qs))) + results <- liftIO $ forM qs $ \sq -> do + let rId = recipientId sq + (rId,) <$> if S.member rId updated + then readQueueRecIO (queueRec sq) $>>= \q -> do + atomically $ writeTVar (queueRec sq) $ Just $ updateRec q + withLog "setQueueServices" st $ \sl -> logQueueService sl rId party serviceId + pure $ Right () + else pure $ Left AUTH + pure $ M.fromList results + where + updateQuery = case party of + SRecipientService -> + "UPDATE msg_queues SET rcv_service_id = ? WHERE recipient_id IN ? AND deleted_at IS NULL RETURNING recipient_id" + SNotifierService -> + "UPDATE msg_queues SET ntf_service_id = ? WHERE recipient_id IN ? AND notifier_id IS NOT NULL AND deleted_at IS NULL RETURNING recipient_id" + updateRec q = case party of + SRecipientService -> q {rcvServiceId = serviceId} + SNotifierService -> case notifier q of + Just nc -> q {notifier = Just nc {ntfServiceId = serviceId}} + Nothing -> q + getQueueNtfServices :: PostgresQueueStore q -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)])) getQueueNtfServices st ntfs = E.uninterruptibleMask_ $ runExceptT $ do snIds <- diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index 3a236076c4..583bbf3384 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -337,6 +337,10 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where mapM_ (removeServiceQueue st serviceSel qId) prevSrvId mapM_ (addServiceQueue st serviceSel qId) serviceId + setQueueServices st party serviceId qs = Right . M.fromList <$> mapM setQueue qs + where + setQueue sq = (recipientId sq,) <$> setQueueService st sq party serviceId + getQueueNtfServices :: STMQueueStore q -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)])) getQueueNtfServices st ntfs = do ss <- readTVarIO (services st) diff --git a/src/Simplex/Messaging/Server/QueueStore/Types.hs b/src/Simplex/Messaging/Server/QueueStore/Types.hs index 7d1d439bdc..415a5f33cb 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Types.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Types.hs @@ -16,6 +16,7 @@ import Control.Concurrent.STM import Control.Monad import Data.Int (Int64) import Data.List.NonEmpty (NonEmpty) +import Data.Map.Strict (Map) import Data.Text (Text) import Simplex.Messaging.Protocol import Simplex.Messaging.Server.QueueStore @@ -51,6 +52,7 @@ class StoreQueueClass q => QueueStoreClass q s where deleteStoreQueue :: s -> q -> IO (Either ErrorType QueueRec) getCreateService :: s -> ServiceRec -> IO (Either ErrorType ServiceId) setQueueService :: (PartyI p, ServiceParty p) => s -> q -> SParty p -> Maybe ServiceId -> IO (Either ErrorType ()) + setQueueServices :: (PartyI p, ServiceParty p) => s -> SParty p -> Maybe ServiceId -> [q] -> IO (Either ErrorType (Map RecipientId (Either ErrorType ()))) getQueueNtfServices :: s -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)])) getServiceQueueCountHash :: (PartyI p, ServiceParty p) => s -> SParty p -> ServiceId -> IO (Either ErrorType (Int64, IdsHash))