From 4b132a1e18292dc27228f7f78716f1ebe89a4209 Mon Sep 17 00:00:00 2001 From: "Evgeny @ SimpleX Chat" <259188159+evgeny-simplex@users.noreply.github.com> Date: Thu, 2 Apr 2026 09:02:16 +0000 Subject: [PATCH 1/6] smp: batch queue association updates on subscriptions --- plans/20260401_01_batch_queue_associations.md | 190 ++++++++++++++++++ src/Simplex/Messaging/Server.hs | 88 ++++---- .../Messaging/Server/MsgStore/Journal.hs | 2 + .../Messaging/Server/QueueStore/Postgres.hs | 2 + .../Messaging/Server/QueueStore/STM.hs | 2 + .../Messaging/Server/QueueStore/Types.hs | 2 + 6 files changed, 247 insertions(+), 39 deletions(-) create mode 100644 plans/20260401_01_batch_queue_associations.md diff --git a/plans/20260401_01_batch_queue_associations.md b/plans/20260401_01_batch_queue_associations.md new file mode 100644 index 0000000000..b33002c41e --- /dev/null +++ b/plans/20260401_01_batch_queue_associations.md @@ -0,0 +1,190 @@ +# Server: batch queue service associations + +When a batch of SUB or NSUB commands arrives from a service client, each command that needs a new or removed service association calls `setQueueService` individually - one DB write per command. For 135 commands per batch, that's 135 individual `UPDATE msg_queues` queries. + +## Goal + +Reduce to 1 DB query per batch, using `UPDATE ... RETURNING recipient_id` to identify which queues were actually updated. + +`clntServiceId` is per-client (not per-queue), so all queues in a batch that need association changes share the same target value. The per-queue decision is binary: update or not. + +```haskell +type NeedsAssocUpdate = Bool +``` + +## Current code + +### `sharedSubscribeQueue` (Server.hs:1757-1805) + +Called per SUB/NSUB command. Based on `clntServiceId` and `queueServiceId` from `QueueRec`: + +- `clntServiceId == Just sId`, `queueServiceId == Just sId` (line 1763): Already associated. No DB write. STM + stats only. + +- `clntServiceId == Just sId`, `queueServiceId /= Just sId` (line 1772): New/updated association. Calls `setQueueService q party (Just sId)` - **DB WRITE**. Then STM + stats. + +- `clntServiceId == Nothing`, `queueServiceId == Just _` (line 1787): Removing association. Calls `setQueueService q party Nothing` - **DB WRITE**. Then STM + stats. + +- `clntServiceId == Nothing`, `queueServiceId == Nothing` (line 1795): No service. No DB write. STM only. + +### Where `sharedSubscribeQueue` is called from + +Only from the `client` function's `foldrM` loop in `Server.hs` (via `processCommand` -> `subscribeQueueAndDeliver` or `subscribeNotifications`). The forwarded command handler (line 2094) only processes sender commands, never SUB/NSUB. So `prepareBatch` always runs before `sharedSubscribeQueue`. + +### `setQueueService` for Postgres (QueueStore/Postgres.hs:484-505) + +Per queue: +1. `withQueueRec sq` - reads QueueRec TVar under queue lock, fails if deleted +2. Checks if already set to target value - returns immediately if so +3. `assertUpdated $ withDB' ... DB.execute "UPDATE ..."` - one DB query, asserts 1 row affected +4. `atomically $ writeTVar (queueRec sq) $ Just q'` - updates in-memory QueueRec +5. `withLog ... logQueueService` - writes store log entry + +### `setQueueService` for STM (QueueStore/STM.hs:312-338) + +Per queue: +1. `atomically (readQueueRec qr $>>= setService)` - reads QueueRec, updates TVar, updates per-service queue sets +2. `$>> withLog ... logQueueService` - writes store log entry + +## Implementation (top-down) + +### Step 1: Extend batch preparation in the `client` function (Server.hs) + +Currently (Server.hs:1372-1381): +```haskell +forever $ do + batch <- atomically (readTBQueue rcvQ) + msgMap <- prefetchMsgs batch + foldrM (process msgMap) ([], []) batch + >>= ... +``` + +Rename `prefetchMsgs` to `prepareBatch`. It returns an additional `Map RecipientId (Either ErrorType ())` for association results. + +```haskell +forever $ do + batch <- atomically (readTBQueue rcvQ) + (msgMap, assocResults) <- prepareBatch batch + foldrM (process msgMap assocResults) ([], []) batch + >>= ... +``` + +`assocResults` contains entries only for queues that needed an association update. Keyed by `RecipientId`. `Right ()` means the update succeeded. `Left e` means it failed. + +### Step 2: Implement `prepareBatch` (Server.hs) + +Replaces current `prefetchMsgs`. Does three things: + +1. Collects SUB queues for message pre-fetch (existing `tryPeekMsgs` logic, unchanged). + +2. Classifies each SUB/NSUB queue's association need by reading `queueServiceId` from the already-loaded `QueueRec` in `VerifiedTransmission` and comparing with `clntServiceId`. Produces `NonEmpty (Either ErrorType NeedsAssocUpdate)` aligned with the batch. Error if `q_ = Nothing` for a SUB/NSUB command. `True` if the queue needs its association updated. `False` if no change needed. + +3. Collects `StoreQueue`s where classification produced `Right True`. If non-empty, calls `setQueueServices` with `clntServiceId` as target and this list. Gets back `Set RecipientId` of queues that were actually updated. + +4. Builds `assocResults :: Map RecipientId (Either ErrorType ())`: for each queue that needed an update (`Right True`), if its `recipientId` is in the returned set then `Right ()`, otherwise `Left AUTH`. + +### Step 3: Thread `assocResults` through `processCommand` (Server.hs:1463) + +Add parameter: +```haskell +processCommand :: Maybe ServiceId -> VersionSMP -> Either ErrorType (Map RecipientId Message) -> Map RecipientId (Either ErrorType ()) -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage) +``` + +In the SUB case, pass `M.lookup entId assocResults` to `subscribeQueueAndDeliver`. +In the NSUB case, pass `M.lookup entId assocResults` to `subscribeNotifications`. +In the forwarded command call (line 2094), pass `M.empty`. +All other commands ignore it. + +### Step 4: Thread through `subscribeQueueAndDeliver` (Server.hs:1631) and `subscribeNotifications` (Server.hs:1737) + +Both gain `assocResult :: Maybe (Either ErrorType ())` and pass it to `sharedSubscribeQueue`. + +`subscribeQueueAndDeliver` signature becomes: +```haskell +subscribeQueueAndDeliver :: Maybe Message -> Maybe (Either ErrorType ()) -> StoreQueue s -> QueueRec -> M s ResponseAndMessage +``` + +### Step 5: Modify `sharedSubscribeQueue` (Server.hs:1757) + +Gains `assocResult :: Maybe (Either ErrorType ())`. + +Where the queue needs a new or changed association (line 1772), currently: +```haskell +| otherwise -> runExceptT $ do + ExceptT $ setQueueService (queueStore ms) q party (Just serviceId) + hasSub <- ... +``` + +Becomes: +```haskell +| otherwise -> case assocResult of + Just (Left e) -> pure $ Left e + _ -> runExceptT $ do + hasSub <- ... +``` + +`Just (Left e)` means the batch update failed for this queue - return the error. +`Just (Right ())` means the batch update succeeded - skip `setQueueService`, proceed with STM work. +`Nothing` cannot happen here because `prepareBatch` always runs before this code and classifies every SUB/NSUB queue. + +Same change where removing association (line 1787): +```haskell +Just _ -> case assocResult of + Just (Left e) -> pure $ Left e + _ -> runExceptT $ do + liftIO $ incSrvStat srvAssocRemoved + ... +``` + +Queues that are already associated correctly or have no service involvement have `assocResult = Nothing` (not in the map). These paths don't call `setQueueService` today, so nothing changes for them. + +### Step 6: Add `setQueueServices` to `QueueStoreClass` (QueueStore/Types.hs:53) + +```haskell +setQueueServices :: (PartyI p, ServiceParty p) => s -> SParty p -> Maybe ServiceId -> [q] -> IO (Set RecipientId) +``` + +Takes target `serviceId` and list of queues. Returns set of `RecipientId`s that were actually updated in the DB. + +### Step 7: Postgres implementation (QueueStore/Postgres.hs) + +For `SRecipientService`: +```sql +UPDATE msg_queues SET rcv_service_id = ? +WHERE recipient_id IN ? AND deleted_at IS NULL +RETURNING recipient_id +``` + +For `SNotifierService`: +```sql +UPDATE msg_queues SET ntf_service_id = ? +WHERE recipient_id IN ? AND notifier_id IS NOT NULL AND deleted_at IS NULL +RETURNING recipient_id +``` + +Build `Set RecipientId` from RETURNING rows. + +After the batch query, for each queue whose `recipientId` is in the returned set: +1. Read `QueueRec` from TVar, update with new `serviceId` (same as `updateQueueRec` at line 502-504) +2. Write store log entry (same as `withLog` at line 505) + +Queues not in the returned set are not updated (deleted between verification and UPDATE). The caller sees them absent from the set and produces `Left AUTH`. + +No per-queue lock needed: the batch UPDATE is a single SQL statement (Postgres handles row-level locking internally), and SUB/NSUB processing is single-threaded per connected client. + +### Step 8: STM implementation (QueueStore/STM.hs) + +Loop over queues. For each: +1. Run existing `setService` STM logic from `setQueueService` (line 319-334): read QueueRec, update TVar, update per-service queue sets +2. If succeeded, add `recipientId` to result set +3. Write store log entry + +Return accumulated `Set RecipientId`. + +## Files changed + +| File | Change | +|---|---| +| `src/Simplex/Messaging/Server.hs` | Rename `prefetchMsgs` to `prepareBatch` adding classification and `setQueueServices` call. Thread `assocResults` through `processCommand` -> `subscribeQueueAndDeliver` / `subscribeNotifications` -> `sharedSubscribeQueue`. Replace `setQueueService` calls with `assocResult` check. | +| `src/Simplex/Messaging/Server/QueueStore/Types.hs` | Add `setQueueServices` to `QueueStoreClass` | +| `src/Simplex/Messaging/Server/QueueStore/Postgres.hs` | Implement `setQueueServices` with batch `UPDATE ... RETURNING` + per-item TVar and store log updates | +| `src/Simplex/Messaging/Server/QueueStore/STM.hs` | Implement `setQueueServices` as loop over existing STM logic | diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 66100e97d8..9395f298fd 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -1366,19 +1366,25 @@ client labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands" let THandleParams {thVersion} = thParams' clntServiceId = (\THClientService {serviceId} -> serviceId) <$> (peerClientService =<< thAuth thParams') - process msgMap t acc@(rs, msgs) = + process msgMap assocResults t acc@(rs, msgs) = (maybe acc (\(!r, !msg_) -> (r : rs, maybe msgs (: msgs) msg_))) - <$> processCommand clntServiceId thVersion msgMap t + <$> processCommand clntServiceId thVersion msgMap assocResults t forever $ do batch <- atomically (readTBQueue rcvQ) - msgMap <- prefetchMsgs batch - foldrM (process msgMap) ([], []) batch + (msgMap, assocResults) <- prepareBatch clntServiceId batch + foldrM (process msgMap assocResults) ([], []) batch >>= \(rs_, msgs) -> mapM_ (atomically . writeTBQueue sndQ . (,msgs)) (L.nonEmpty rs_) where - prefetchMsgs :: NonEmpty (VerifiedTransmission s) -> M s (Either ErrorType (Map RecipientId Message)) - prefetchMsgs batch = - let subQs = [q | (Just (q, _), (_, _, Cmd SRecipient SUB)) <- L.toList batch] - in if null subQs then pure $ Right M.empty else liftIO $ runExceptT $ tryPeekMsgs ms subQs + prepareBatch :: Maybe ServiceId -> NonEmpty (VerifiedTransmission s) -> M s (Either ErrorType (Map RecipientId Message), Map RecipientId (Either ErrorType ())) + prepareBatch clntServiceId_ batch = do + let ts = L.toList batch + subQs = [q | (Just (q, _), (_, _, Cmd SRecipient SUB)) <- ts] + updateQs = [q | (Just (q, qr), (_, _, Cmd SRecipient SUB)) <- ts, clntServiceId_ /= rcvServiceId qr] + msgMap <- if null subQs then pure $ Right M.empty else liftIO $ runExceptT $ tryPeekMsgs ms subQs + assocResults <- if null updateQs then pure M.empty else do + updated <- liftIO $ setQueueServices @(StoreQueue s) (queueStore ms) SRecipientService clntServiceId_ updateQs + pure $ M.fromList [(recipientId q, if S.member (recipientId q) updated then Right () else Left AUTH) | q <- updateQs] + pure (msgMap, assocResults) processProxiedCmd :: Transmission (Command 'ProxiedClient) -> M s (Maybe ResponseAndMessage) processProxiedCmd (corrId, EntityId sessId, command) = (\t -> ((corrId, EntityId sessId, t), Nothing)) <$$> case command of @@ -1460,8 +1466,8 @@ client mkIncProxyStats ps psOwn own sel = do incStat $ sel ps when own $ incStat $ sel psOwn - processCommand :: Maybe ServiceId -> VersionSMP -> Either ErrorType (Map RecipientId Message) -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage) - processCommand clntServiceId clntVersion msgMap (q_, (corrId, entId, cmd)) = case cmd of + processCommand :: Maybe ServiceId -> VersionSMP -> Either ErrorType (Map RecipientId Message) -> Map RecipientId (Either ErrorType ()) -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage) + processCommand clntServiceId clntVersion msgMap assocResults (q_, (corrId, entId, cmd)) = case cmd of Cmd SProxiedClient command -> processProxiedCmd (corrId, entId, command) Cmd SSender command -> case command of SKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k @@ -1472,7 +1478,7 @@ client LKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k $>> getQueueLink_ q qr LGET -> withQueue $ \q qr -> checkContact qr $ getQueueLink_ q qr Cmd SNotifier NSUB -> response . (corrId,entId,) <$> case q_ of - Just (q, QueueRec {notifier = Just ntfCreds}) -> subscribeNotifications q ntfCreds + Just (q, QueueRec {notifier = Just ntfCreds}) -> subscribeNotifications (M.lookup entId assocResults) q ntfCreds _ -> pure $ ERR INTERNAL Cmd SNotifierService (NSUBS n idsHash) -> response . (corrId,entId,) <$> case clntServiceId of Just serviceId -> subscribeServiceNotifications serviceId (n, idsHash) @@ -1487,7 +1493,7 @@ client case command of SUB -> case msgMap of Left e -> pure $ Just (err e, Nothing) - Right msgs -> withQueue' $ subscribeQueueAndDeliver (M.lookup entId msgs) + Right msgs -> withQueue' $ subscribeQueueAndDeliver (M.lookup entId msgs) (M.lookup entId assocResults) GET -> withQueue getMessage ACK msgId -> withQueue $ acknowledgeMsg msgId KEY sKey -> withQueue $ \q _ -> either err (corrId,entId,) <$> secureQueue_ q sKey @@ -1628,11 +1634,11 @@ client suspendQueue_ :: (StoreQueue s, QueueRec) -> M s (Transmission BrokerMsg) suspendQueue_ (q, _) = liftIO $ either err (const ok) <$> suspendQueue (queueStore ms) q - subscribeQueueAndDeliver :: Maybe Message -> StoreQueue s -> QueueRec -> M s ResponseAndMessage - subscribeQueueAndDeliver msg_ q qr@QueueRec {rcvServiceId} = + subscribeQueueAndDeliver :: Maybe Message -> Maybe (Either ErrorType ()) -> StoreQueue s -> QueueRec -> M s ResponseAndMessage + subscribeQueueAndDeliver msg_ assocResult q qr@QueueRec {rcvServiceId} = liftIO (TM.lookupIO entId $ subscriptions clnt) >>= \case Nothing -> - sharedSubscribeQueue q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices >>= \case + sharedSubscribeQueue assocResult q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices >>= \case Left e -> pure (err e, Nothing) Right s -> deliver s Just s@Sub {subThread} -> do @@ -1734,9 +1740,9 @@ client then action q qr else liftIO (updateQueueTime (queueStore ms) q t) >>= either (pure . err') (action q) - subscribeNotifications :: StoreQueue s -> NtfCreds -> M s BrokerMsg - subscribeNotifications q NtfCreds {ntfServiceId} = - sharedSubscribeQueue q SNotifierService ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure ()) ntfServices >>= \case + subscribeNotifications :: Maybe (Either ErrorType ()) -> StoreQueue s -> NtfCreds -> M s BrokerMsg + subscribeNotifications assocResult q NtfCreds {ntfServiceId} = + sharedSubscribeQueue assocResult q SNotifierService ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure ()) ntfServices >>= \case Left e -> pure $ ERR e Right (hasSub, _) -> do when (isNothing clntServiceId) $ @@ -1745,6 +1751,7 @@ client sharedSubscribeQueue :: (PartyI p, ServiceParty p) => + Maybe (Either ErrorType ()) -> StoreQueue s -> SParty p -> Maybe ServiceId -> @@ -1754,7 +1761,7 @@ client STM sub -> (ServerStats -> ServiceStats) -> M s (Either ErrorType (Bool, Maybe sub)) - sharedSubscribeQueue q party queueServiceId srvSubscribers clientSubs clientServiceSubs mkSub servicesSel = do + sharedSubscribeQueue assocResult q party queueServiceId srvSubscribers clientSubs clientServiceSubs mkSub servicesSel = do stats <- asks serverStats let incSrvStat sel = incStat $ sel $ servicesSel stats writeSub = writeTQueue (subQ srvSubscribers) (CSClient entId queueServiceId clntServiceId, clientId) @@ -1769,29 +1776,32 @@ client incSrvStat srvSubQueues incSrvStat srvAssocDuplicate pure $ Right (hasSub, Nothing) - | otherwise -> runExceptT $ do - -- new or updated queue-service association - ExceptT $ setQueueService (queueStore ms) q party (Just serviceId) - hasSub <- atomically $ (<$ incServiceQueueSubs) =<< hasServiceSub - atomically writeSub - liftIO $ do - unless hasSub $ incSrvStat srvSubCount - incSrvStat srvSubQueues - incSrvStat $ maybe srvAssocNew (const srvAssocUpdated) queueServiceId - pure (hasSub, Nothing) + | otherwise -> case assocResult of + Just (Left e) -> pure $ Left e + _ -> runExceptT $ do + -- association already done in prepareBatch + hasSub <- atomically $ (<$ incServiceQueueSubs) =<< hasServiceSub + atomically writeSub + liftIO $ do + unless hasSub $ incSrvStat srvSubCount + incSrvStat srvSubQueues + incSrvStat $ maybe srvAssocNew (const srvAssocUpdated) queueServiceId + pure (hasSub, Nothing) where hasServiceSub = ((0 /=) . fst) <$> readTVar (clientServiceSubs clnt) -- This function is used when queue association with the service is created. - incServiceQueueSubs = modifyTVar' (clientServiceSubs clnt) $ addServiceSubs (1, queueIdHash (recipientId q)) -- service count and IDs hash + incServiceQueueSubs = modifyTVar' (clientServiceSubs clnt) $ addServiceSubs (1, queueIdHash (recipientId q)) -- service count and IDS hash Nothing -> case queueServiceId of - Just _ -> runExceptT $ do - ExceptT $ setQueueService (queueStore ms) q party Nothing - liftIO $ incSrvStat srvAssocRemoved - -- getSubscription may be Just for receiving service, where clientSubs also hold active deliveries for service subscriptions. - -- For notification service it can only be Just if storage and session states diverge. - r <- atomically $ getSubscription >>= newSub - atomically writeSub - pure r + Just _ -> case assocResult of + Just (Left e) -> pure $ Left e + _ -> runExceptT $ do + -- unassociation already done in prepareBatch + liftIO $ incSrvStat srvAssocRemoved + -- getSubscription may be Just for receiving service, where clientSubs also hold active deliveries for service subscriptions. + -- For notification service it can only be Just if storage and session states diverge. + r <- atomically $ getSubscription >>= newSub + atomically writeSub + pure r Nothing -> do r@(hasSub, _) <- atomically $ getSubscription >>= newSub unless hasSub $ atomically writeSub @@ -2094,7 +2104,7 @@ client -- rejectOrVerify filters allowed commands, no need to repeat it here. -- INTERNAL is used because processCommand never returns Nothing for sender commands (could be extracted for better types). -- `fst` removes empty message that is only returned for `SUB` command - Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion (Right M.empty) t'') + Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion (Right M.empty) M.empty t'') -- encode response r' <- case batchTransmissions clntTHParams [Right (Nothing, encodeTransmission clntTHParams r)] of [] -> throwE INTERNAL -- at least 1 item is guaranteed from NonEmpty/Right diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index c65660c93b..1707038248 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -353,6 +353,8 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where {-# INLINE getCreateService #-} setQueueService = withQS setQueueService {-# INLINE setQueueService #-} + setQueueServices st = withQS (\qs -> setQueueServices qs) st + {-# INLINE setQueueServices #-} getQueueNtfServices = withQS (getQueueNtfServices @(JournalQueue s)) {-# INLINE getQueueNtfServices #-} getServiceQueueCountHash = withQS (getServiceQueueCountHash @(JournalQueue s)) diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs index a8c8c040aa..81dc7271bf 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs @@ -504,6 +504,8 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where atomically $ writeTVar (queueRec sq) $ Just q' withLog "setQueueService" st $ \sl -> logQueueService sl rId party serviceId + setQueueServices _ _ _ _ = pure S.empty -- TODO batch implementation + getQueueNtfServices :: PostgresQueueStore q -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)])) getQueueNtfServices st ntfs = E.uninterruptibleMask_ $ runExceptT $ do snIds <- diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index 3a236076c4..9eebc803e5 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -337,6 +337,8 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where mapM_ (removeServiceQueue st serviceSel qId) prevSrvId mapM_ (addServiceQueue st serviceSel qId) serviceId + setQueueServices _ _ _ _ = pure S.empty -- TODO loop implementation + getQueueNtfServices :: STMQueueStore q -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)])) getQueueNtfServices st ntfs = do ss <- readTVarIO (services st) diff --git a/src/Simplex/Messaging/Server/QueueStore/Types.hs b/src/Simplex/Messaging/Server/QueueStore/Types.hs index 7d1d439bdc..aee602d52f 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Types.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Types.hs @@ -16,6 +16,7 @@ import Control.Concurrent.STM import Control.Monad import Data.Int (Int64) import Data.List.NonEmpty (NonEmpty) +import Data.Set (Set) import Data.Text (Text) import Simplex.Messaging.Protocol import Simplex.Messaging.Server.QueueStore @@ -51,6 +52,7 @@ class StoreQueueClass q => QueueStoreClass q s where deleteStoreQueue :: s -> q -> IO (Either ErrorType QueueRec) getCreateService :: s -> ServiceRec -> IO (Either ErrorType ServiceId) setQueueService :: (PartyI p, ServiceParty p) => s -> q -> SParty p -> Maybe ServiceId -> IO (Either ErrorType ()) + setQueueServices :: (PartyI p, ServiceParty p) => s -> SParty p -> Maybe ServiceId -> [q] -> IO (Set RecipientId) getQueueNtfServices :: s -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)])) getServiceQueueCountHash :: (PartyI p, ServiceParty p) => s -> SParty p -> ServiceId -> IO (Either ErrorType (Int64, IdsHash)) From 8b52f5011fdaa10c5fd5ff9cdbd139fb6159553b Mon Sep 17 00:00:00 2001 From: "Evgeny @ SimpleX Chat" <259188159+evgeny-simplex@users.noreply.github.com> Date: Thu, 2 Apr 2026 11:14:28 +0000 Subject: [PATCH 2/6] refactor to fused batching --- plans/20260401_01_batch_queue_associations.md | 208 ++++++------------ src/Simplex/Messaging/Server.hs | 59 +++-- .../Messaging/Server/MsgStore/Journal.hs | 6 +- .../Messaging/Server/QueueStore/Postgres.hs | 5 +- .../Messaging/Server/QueueStore/STM.hs | 3 +- .../Messaging/Server/QueueStore/Types.hs | 3 +- 6 files changed, 124 insertions(+), 160 deletions(-) diff --git a/plans/20260401_01_batch_queue_associations.md b/plans/20260401_01_batch_queue_associations.md index b33002c41e..07f794df2f 100644 --- a/plans/20260401_01_batch_queue_associations.md +++ b/plans/20260401_01_batch_queue_associations.md @@ -4,187 +4,123 @@ When a batch of SUB or NSUB commands arrives from a service client, each command ## Goal -Reduce to 1 DB query per batch, using `UPDATE ... RETURNING recipient_id` to identify which queues were actually updated. +Reduce to at most 2 DB queries per batch (one for rcv associations, one for ntf associations), using `UPDATE ... RETURNING recipient_id` to identify which queues were actually updated. -`clntServiceId` is per-client (not per-queue), so all queues in a batch that need association changes share the same target value. The per-queue decision is binary: update or not. +Also fuse message pre-fetch and association batching into a single batch preparation step with a clean contract. -```haskell -type NeedsAssocUpdate = Bool -``` - -## Current code - -### `sharedSubscribeQueue` (Server.hs:1757-1805) - -Called per SUB/NSUB command. Based on `clntServiceId` and `queueServiceId` from `QueueRec`: - -- `clntServiceId == Just sId`, `queueServiceId == Just sId` (line 1763): Already associated. No DB write. STM + stats only. - -- `clntServiceId == Just sId`, `queueServiceId /= Just sId` (line 1772): New/updated association. Calls `setQueueService q party (Just sId)` - **DB WRITE**. Then STM + stats. - -- `clntServiceId == Nothing`, `queueServiceId == Just _` (line 1787): Removing association. Calls `setQueueService q party Nothing` - **DB WRITE**. Then STM + stats. - -- `clntServiceId == Nothing`, `queueServiceId == Nothing` (line 1795): No service. No DB write. STM only. - -### Where `sharedSubscribeQueue` is called from - -Only from the `client` function's `foldrM` loop in `Server.hs` (via `processCommand` -> `subscribeQueueAndDeliver` or `subscribeNotifications`). The forwarded command handler (line 2094) only processes sender commands, never SUB/NSUB. So `prepareBatch` always runs before `sharedSubscribeQueue`. - -### `setQueueService` for Postgres (QueueStore/Postgres.hs:484-505) +## Contract -Per queue: -1. `withQueueRec sq` - reads QueueRec TVar under queue lock, fails if deleted -2. Checks if already set to target value - returns immediately if so -3. `assertUpdated $ withDB' ... DB.execute "UPDATE ..."` - one DB query, asserts 1 row affected -4. `atomically $ writeTVar (queueRec sq) $ Just q'` - updates in-memory QueueRec -5. `withLog ... logQueueService` - writes store log entry - -### `setQueueService` for STM (QueueStore/STM.hs:312-338) - -Per queue: -1. `atomically (readQueueRec qr $>>= setService)` - reads QueueRec, updates TVar, updates per-service queue sets -2. `$>> withLog ... logQueueService` - writes store log entry - -## Implementation (top-down) - -### Step 1: Extend batch preparation in the `client` function (Server.hs) - -Currently (Server.hs:1372-1381): ```haskell -forever $ do - batch <- atomically (readTBQueue rcvQ) - msgMap <- prefetchMsgs batch - foldrM (process msgMap) ([], []) batch - >>= ... +prepareBatch :: Maybe ServiceId -> NonEmpty (VerifiedTransmission s) -> M s (Either ErrorType (Map RecipientId (Maybe Message, Maybe (Either ErrorType ())))) ``` -Rename `prefetchMsgs` to `prepareBatch`. It returns an additional `Map RecipientId (Either ErrorType ())` for association results. +`Left e` = batch-level failure (message pre-fetch or association query failed entirely). All SUBs/NSUBs in the batch get this error. -```haskell -forever $ do - batch <- atomically (readTBQueue rcvQ) - (msgMap, assocResults) <- prepareBatch batch - foldrM (process msgMap assocResults) ([], []) batch - >>= ... -``` +`Right map` = per-queue results as a tuple: +- `Maybe Message` - pre-fetched message for SUB queues, `Nothing` for NSUB or no message +- `Maybe (Either ErrorType ())` - association result. `Nothing` = no update needed. `Just (Right ())` = update succeeded. `Just (Left e)` = update failed for this queue. -`assocResults` contains entries only for queues that needed an association update. Keyed by `RecipientId`. `Right ()` means the update succeeded. `Left e` means it failed. +One map, one lookup per queue. `processCommand` passes both values to `subscribeQueueAndDeliver` / `subscribeNotifications` -> `sharedSubscribeQueue`. -### Step 2: Implement `prepareBatch` (Server.hs) +Queues not in the map (non-SUB/NSUB commands, failed verification) are not affected. -Replaces current `prefetchMsgs`. Does three things: +## prepareBatch implementation -1. Collects SUB queues for message pre-fetch (existing `tryPeekMsgs` logic, unchanged). +One accumulating fold over the batch, collecting three lists: +- `subMsgQs :: [StoreQueue s]` - SUB queues for message pre-fetch +- `rcvAssocQs :: [StoreQueue s]` - SUB queues needing `rcv_service_id` update (`clntServiceId /= rcvServiceId qr`) +- `ntfAssocQs :: [StoreQueue s]` - NSUB queues needing `ntf_service_id` update (`clntServiceId /= ntfServiceId` from `NtfCreds`) -2. Classifies each SUB/NSUB queue's association need by reading `queueServiceId` from the already-loaded `QueueRec` in `VerifiedTransmission` and comparing with `clntServiceId`. Produces `NonEmpty (Either ErrorType NeedsAssocUpdate)` aligned with the batch. Error if `q_ = Nothing` for a SUB/NSUB command. `True` if the queue needs its association updated. `False` if no change needed. +Classification reads from the already-loaded `QueueRec` in `VerifiedTransmission` - no extra DB query. -3. Collects `StoreQueue`s where classification produced `Right True`. If non-empty, calls `setQueueServices` with `clntServiceId` as target and this list. Gets back `Set RecipientId` of queues that were actually updated. +Then three store calls (each skipped if its list is empty): +1. `tryPeekMsgs ms subMsgQs` -> `Map RecipientId Message` +2. `setRcvQueueServices (queueStore ms) clntServiceId rcvAssocQs` -> `Set RecipientId` +3. `setNtfQueueServices (queueStore ms) clntServiceId ntfAssocQs` -> `Set RecipientId` -4. Builds `assocResults :: Map RecipientId (Either ErrorType ())`: for each queue that needed an update (`Right True`), if its `recipientId` is in the returned set then `Right ()`, otherwise `Left AUTH`. +Then one pass to merge results into `Map RecipientId (Maybe Message, Maybe (Either ErrorType ()))`: +- For each SUB queue: `(M.lookup rId msgMap, assocResult rId rcvUpdated rcvAssocQs)` +- For each NSUB queue: `(Nothing, assocResult rId ntfUpdated ntfAssocQs)` -### Step 3: Thread `assocResults` through `processCommand` (Server.hs:1463) +Where `assocResult rId updated assocQs` = if the queue was in `assocQs` (needed update), then `Just (Right ())` if `rId` is in `updated`, else `Just (Left AUTH)`. If not in `assocQs` (no update needed), `Nothing`. -Add parameter: -```haskell -processCommand :: Maybe ServiceId -> VersionSMP -> Either ErrorType (Map RecipientId Message) -> Map RecipientId (Either ErrorType ()) -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage) -``` +If any of the three calls fails entirely, return `Left e`. -In the SUB case, pass `M.lookup entId assocResults` to `subscribeQueueAndDeliver`. -In the NSUB case, pass `M.lookup entId assocResults` to `subscribeNotifications`. -In the forwarded command call (line 2094), pass `M.empty`. -All other commands ignore it. +## Store interface -### Step 4: Thread through `subscribeQueueAndDeliver` (Server.hs:1631) and `subscribeNotifications` (Server.hs:1737) +Replace the polymorphic `setQueueServices` with two plain functions in `QueueStoreClass`: -Both gain `assocResult :: Maybe (Either ErrorType ())` and pass it to `sharedSubscribeQueue`. - -`subscribeQueueAndDeliver` signature becomes: ```haskell -subscribeQueueAndDeliver :: Maybe Message -> Maybe (Either ErrorType ()) -> StoreQueue s -> QueueRec -> M s ResponseAndMessage +setRcvQueueServices :: s -> Maybe ServiceId -> [q] -> IO (Set RecipientId) +setNtfQueueServices :: s -> Maybe ServiceId -> [q] -> IO (Set RecipientId) ``` -### Step 5: Modify `sharedSubscribeQueue` (Server.hs:1757) +No `SParty p` polymorphism. Each function knows its column. -Gains `assocResult :: Maybe (Either ErrorType ())`. +### Postgres implementation -Where the queue needs a new or changed association (line 1772), currently: -```haskell -| otherwise -> runExceptT $ do - ExceptT $ setQueueService (queueStore ms) q party (Just serviceId) - hasSub <- ... +`setRcvQueueServices`: +```sql +UPDATE msg_queues SET rcv_service_id = ? +WHERE recipient_id IN ? AND deleted_at IS NULL +RETURNING recipient_id ``` -Becomes: -```haskell -| otherwise -> case assocResult of - Just (Left e) -> pure $ Left e - _ -> runExceptT $ do - hasSub <- ... +`setNtfQueueServices`: +```sql +UPDATE msg_queues SET ntf_service_id = ? +WHERE recipient_id IN ? AND notifier_id IS NOT NULL AND deleted_at IS NULL +RETURNING recipient_id ``` -`Just (Left e)` means the batch update failed for this queue - return the error. -`Just (Right ())` means the batch update succeeded - skip `setQueueService`, proceed with STM work. -`Nothing` cannot happen here because `prepareBatch` always runs before this code and classifies every SUB/NSUB queue. +After each batch query, for each queue in the returned set: +1. Read QueueRec TVar, update with new serviceId +2. Write store log entry -Same change where removing association (line 1787): -```haskell -Just _ -> case assocResult of - Just (Left e) -> pure $ Left e - _ -> runExceptT $ do - liftIO $ incSrvStat srvAssocRemoved - ... -``` +### STM implementation -Queues that are already associated correctly or have no service involvement have `assocResult = Nothing` (not in the map). These paths don't call `setQueueService` today, so nothing changes for them. +Loop over queues, call existing per-item logic, collect succeeded `RecipientId`s into a Set. -### Step 6: Add `setQueueServices` to `QueueStoreClass` (QueueStore/Types.hs:53) +## Downstream changes in Server.hs -```haskell -setQueueServices :: (PartyI p, ServiceParty p) => s -> SParty p -> Maybe ServiceId -> [q] -> IO (Set RecipientId) -``` +### processCommand -Takes target `serviceId` and list of queues. Returns set of `RecipientId`s that were actually updated in the DB. +Gains one parameter: `Map RecipientId (Maybe Message, Maybe (Either ErrorType ()))`. -### Step 7: Postgres implementation (QueueStore/Postgres.hs) +SUB case: `M.lookup entId prepared` gives `Just (msg_, assocResult)` or `Nothing`. Pass both to `subscribeQueueAndDeliver`. -For `SRecipientService`: -```sql -UPDATE msg_queues SET rcv_service_id = ? -WHERE recipient_id IN ? AND deleted_at IS NULL -RETURNING recipient_id -``` +NSUB case: `M.lookup entId prepared` gives `Just (Nothing, assocResult)` or `Nothing`. Pass `assocResult` to `subscribeNotifications`. -For `SNotifierService`: -```sql -UPDATE msg_queues SET ntf_service_id = ? -WHERE recipient_id IN ? AND notifier_id IS NOT NULL AND deleted_at IS NULL -RETURNING recipient_id -``` +Forwarded commands: pass `M.empty`. -Build `Set RecipientId` from RETURNING rows. +### subscribeQueueAndDeliver -After the batch query, for each queue whose `recipientId` is in the returned set: -1. Read `QueueRec` from TVar, update with new `serviceId` (same as `updateQueueRec` at line 502-504) -2. Write store log entry (same as `withLog` at line 505) +Takes `Maybe Message` and `Maybe (Either ErrorType ())` as before. No change in how it uses them. -Queues not in the returned set are not updated (deleted between verification and UPDATE). The caller sees them absent from the set and produces `Left AUTH`. +### sharedSubscribeQueue -No per-queue lock needed: the batch UPDATE is a single SQL statement (Postgres handles row-level locking internally), and SUB/NSUB processing is single-threaded per connected client. +Takes `Maybe (Either ErrorType ())`. On paths needing association update: +- `Just (Left e)` -> return error +- `Just (Right ())` -> skip `setQueueService`, proceed with STM work +- `Nothing` -> no update needed, proceed with existing logic -### Step 8: STM implementation (QueueStore/STM.hs) +## Implementation order (top-down) -Loop over queues. For each: -1. Run existing `setService` STM logic from `setQueueService` (line 319-334): read QueueRec, update TVar, update per-service queue sets -2. If succeeded, add `recipientId` to result set -3. Write store log entry +1. Define the `prepareBatch` contract and thread one map through `processCommand` -> `subscribeQueueAndDeliver` / `subscribeNotifications` -> `sharedSubscribeQueue` (Server.hs) +2. Implement `prepareBatch` with the fold, three calls, and merge (Server.hs) +3. Add `setRcvQueueServices` and `setNtfQueueServices` to `QueueStoreClass` (Types.hs) +4. Implement for Postgres with batch `UPDATE ... RETURNING` (Postgres.hs) +5. Implement for STM as loop (STM.hs) +6. Implement for Journal as delegation (Journal.hs) -Return accumulated `Set RecipientId`. +At step 2, store functions can initially be stubs returning empty sets. Steps 3-6 fill in the real implementations. ## Files changed | File | Change | |---|---| -| `src/Simplex/Messaging/Server.hs` | Rename `prefetchMsgs` to `prepareBatch` adding classification and `setQueueServices` call. Thread `assocResults` through `processCommand` -> `subscribeQueueAndDeliver` / `subscribeNotifications` -> `sharedSubscribeQueue`. Replace `setQueueService` calls with `assocResult` check. | -| `src/Simplex/Messaging/Server/QueueStore/Types.hs` | Add `setQueueServices` to `QueueStoreClass` | -| `src/Simplex/Messaging/Server/QueueStore/Postgres.hs` | Implement `setQueueServices` with batch `UPDATE ... RETURNING` + per-item TVar and store log updates | -| `src/Simplex/Messaging/Server/QueueStore/STM.hs` | Implement `setQueueServices` as loop over existing STM logic | +| `src/Simplex/Messaging/Server.hs` | `prepareBatch` with fold + merge; one map parameter through `processCommand` -> `subscribeQueueAndDeliver` / `subscribeNotifications` -> `sharedSubscribeQueue` | +| `src/Simplex/Messaging/Server/QueueStore/Types.hs` | Add `setRcvQueueServices`, `setNtfQueueServices` to `QueueStoreClass` | +| `src/Simplex/Messaging/Server/QueueStore/Postgres.hs` | Implement with batch `UPDATE ... RETURNING` + per-item TVar/log updates | +| `src/Simplex/Messaging/Server/QueueStore/STM.hs` | Implement as loop | +| `src/Simplex/Messaging/Server/MsgStore/Journal.hs` | Delegate to underlying store | diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 9395f298fd..ed6e1aaab1 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -1366,25 +1366,41 @@ client labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands" let THandleParams {thVersion} = thParams' clntServiceId = (\THClientService {serviceId} -> serviceId) <$> (peerClientService =<< thAuth thParams') - process msgMap assocResults t acc@(rs, msgs) = + process prepared t acc@(rs, msgs) = (maybe acc (\(!r, !msg_) -> (r : rs, maybe msgs (: msgs) msg_))) - <$> processCommand clntServiceId thVersion msgMap assocResults t + <$> processCommand clntServiceId thVersion prepared t forever $ do batch <- atomically (readTBQueue rcvQ) - (msgMap, assocResults) <- prepareBatch clntServiceId batch - foldrM (process msgMap assocResults) ([], []) batch + prepared <- prepareBatch clntServiceId batch + foldrM (process prepared) ([], []) batch >>= \(rs_, msgs) -> mapM_ (atomically . writeTBQueue sndQ . (,msgs)) (L.nonEmpty rs_) where - prepareBatch :: Maybe ServiceId -> NonEmpty (VerifiedTransmission s) -> M s (Either ErrorType (Map RecipientId Message), Map RecipientId (Either ErrorType ())) + prepareBatch :: Maybe ServiceId -> NonEmpty (VerifiedTransmission s) -> M s (Either ErrorType (Map RecipientId (Maybe Message, Maybe (Either ErrorType ())))) prepareBatch clntServiceId_ batch = do - let ts = L.toList batch - subQs = [q | (Just (q, _), (_, _, Cmd SRecipient SUB)) <- ts] - updateQs = [q | (Just (q, qr), (_, _, Cmd SRecipient SUB)) <- ts, clntServiceId_ /= rcvServiceId qr] - msgMap <- if null subQs then pure $ Right M.empty else liftIO $ runExceptT $ tryPeekMsgs ms subQs - assocResults <- if null updateQs then pure M.empty else do - updated <- liftIO $ setQueueServices @(StoreQueue s) (queueStore ms) SRecipientService clntServiceId_ updateQs - pure $ M.fromList [(recipientId q, if S.member (recipientId q) updated then Right () else Left AUTH) | q <- updateQs] - pure (msgMap, assocResults) + let (subMsgQs, rcvAssocQs, ntfAssocQs) = foldl' classify ([], [], []) $ L.toList batch + classify (!msgQs, !rcvQs, !ntfQs) = \case + (Just (q, qr), (_, _, Cmd SRecipient SUB)) + | clntServiceId_ /= rcvServiceId qr -> (q : msgQs, q : rcvQs, ntfQs) + | otherwise -> (q : msgQs, rcvQs, ntfQs) + (Just (q, qr), (_, _, Cmd SNotifier NSUB)) + | clntServiceId_ /= (notifier qr >>= ntfServiceId) -> (msgQs, rcvQs, q : ntfQs) + | otherwise -> (msgQs, rcvQs, ntfQs) + _ -> (msgQs, rcvQs, ntfQs) + liftIO $ runExceptT $ do + msgs <- if null subMsgQs then pure M.empty else tryPeekMsgs ms subMsgQs + rcvUpdated <- if null rcvAssocQs then pure S.empty else ExceptT $ Right <$> setRcvQueueServices @(StoreQueue s) (queueStore ms) clntServiceId_ rcvAssocQs + ntfUpdated <- if null ntfAssocQs then pure S.empty else ExceptT $ Right <$> setNtfQueueServices @(StoreQueue s) (queueStore ms) clntServiceId_ ntfAssocQs + let rcvSet = S.fromList $ map recipientId rcvAssocQs + assocResult rId updated = if S.member rId updated then Right () else Left AUTH + mkEntry q = + let rId = recipientId q + msg_ = M.lookup rId msgs + assoc_ + | S.member rId rcvSet = Just $ assocResult rId rcvUpdated + | otherwise = Nothing + in (rId, (msg_, assoc_)) + mkNtfEntry q = let rId = recipientId q in (rId, (Nothing, Just $ assocResult rId ntfUpdated)) + pure $ M.fromList $ map mkEntry subMsgQs <> map mkNtfEntry ntfAssocQs processProxiedCmd :: Transmission (Command 'ProxiedClient) -> M s (Maybe ResponseAndMessage) processProxiedCmd (corrId, EntityId sessId, command) = (\t -> ((corrId, EntityId sessId, t), Nothing)) <$$> case command of @@ -1466,8 +1482,8 @@ client mkIncProxyStats ps psOwn own sel = do incStat $ sel ps when own $ incStat $ sel psOwn - processCommand :: Maybe ServiceId -> VersionSMP -> Either ErrorType (Map RecipientId Message) -> Map RecipientId (Either ErrorType ()) -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage) - processCommand clntServiceId clntVersion msgMap assocResults (q_, (corrId, entId, cmd)) = case cmd of + processCommand :: Maybe ServiceId -> VersionSMP -> Either ErrorType (Map RecipientId (Maybe Message, Maybe (Either ErrorType ()))) -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage) + processCommand clntServiceId clntVersion prepared (q_, (corrId, entId, cmd)) = case cmd of Cmd SProxiedClient command -> processProxiedCmd (corrId, entId, command) Cmd SSender command -> case command of SKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k @@ -1478,7 +1494,11 @@ client LKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k $>> getQueueLink_ q qr LGET -> withQueue $ \q qr -> checkContact qr $ getQueueLink_ q qr Cmd SNotifier NSUB -> response . (corrId,entId,) <$> case q_ of - Just (q, QueueRec {notifier = Just ntfCreds}) -> subscribeNotifications (M.lookup entId assocResults) q ntfCreds + Just (q, QueueRec {notifier = Just ntfCreds}) -> + let assoc_ = case prepared of + Left _ -> Nothing + Right prepMap -> M.lookup entId prepMap >>= snd + in subscribeNotifications assoc_ q ntfCreds _ -> pure $ ERR INTERNAL Cmd SNotifierService (NSUBS n idsHash) -> response . (corrId,entId,) <$> case clntServiceId of Just serviceId -> subscribeServiceNotifications serviceId (n, idsHash) @@ -1491,9 +1511,10 @@ client pure $ allowNewQueues && maybe True ((== auth_) . Just) newQueueBasicAuth Cmd SRecipient command -> case command of - SUB -> case msgMap of + SUB -> case prepared of Left e -> pure $ Just (err e, Nothing) - Right msgs -> withQueue' $ subscribeQueueAndDeliver (M.lookup entId msgs) (M.lookup entId assocResults) + Right prepMap -> let (msg_, assoc_) = maybe (Nothing, Nothing) id (M.lookup entId prepMap) + in withQueue' $ subscribeQueueAndDeliver msg_ assoc_ GET -> withQueue getMessage ACK msgId -> withQueue $ acknowledgeMsg msgId KEY sKey -> withQueue $ \q _ -> either err (corrId,entId,) <$> secureQueue_ q sKey @@ -2104,7 +2125,7 @@ client -- rejectOrVerify filters allowed commands, no need to repeat it here. -- INTERNAL is used because processCommand never returns Nothing for sender commands (could be extracted for better types). -- `fst` removes empty message that is only returned for `SUB` command - Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion (Right M.empty) M.empty t'') + Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion (Right M.empty) t'') -- encode response r' <- case batchTransmissions clntTHParams [Right (Nothing, encodeTransmission clntTHParams r)] of [] -> throwE INTERNAL -- at least 1 item is guaranteed from NonEmpty/Right diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 1707038248..a75e31ad95 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -353,8 +353,10 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where {-# INLINE getCreateService #-} setQueueService = withQS setQueueService {-# INLINE setQueueService #-} - setQueueServices st = withQS (\qs -> setQueueServices qs) st - {-# INLINE setQueueServices #-} + setRcvQueueServices st = withQS (\qs -> setRcvQueueServices qs) st + {-# INLINE setRcvQueueServices #-} + setNtfQueueServices st = withQS (\qs -> setNtfQueueServices qs) st + {-# INLINE setNtfQueueServices #-} getQueueNtfServices = withQS (getQueueNtfServices @(JournalQueue s)) {-# INLINE getQueueNtfServices #-} getServiceQueueCountHash = withQS (getServiceQueueCountHash @(JournalQueue s)) diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs index 81dc7271bf..85da0feb4c 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs @@ -504,7 +504,10 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where atomically $ writeTVar (queueRec sq) $ Just q' withLog "setQueueService" st $ \sl -> logQueueService sl rId party serviceId - setQueueServices _ _ _ _ = pure S.empty -- TODO batch implementation + setRcvQueueServices _ _ [] = pure S.empty + setRcvQueueServices _ _ _ = pure S.empty -- TODO batch implementation + setNtfQueueServices _ _ [] = pure S.empty + setNtfQueueServices _ _ _ = pure S.empty -- TODO batch implementation getQueueNtfServices :: PostgresQueueStore q -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)])) getQueueNtfServices st ntfs = E.uninterruptibleMask_ $ runExceptT $ do diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index 9eebc803e5..295fae7e6f 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -337,7 +337,8 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where mapM_ (removeServiceQueue st serviceSel qId) prevSrvId mapM_ (addServiceQueue st serviceSel qId) serviceId - setQueueServices _ _ _ _ = pure S.empty -- TODO loop implementation + setRcvQueueServices _ _ _ = pure S.empty -- TODO loop implementation + setNtfQueueServices _ _ _ = pure S.empty -- TODO loop implementation getQueueNtfServices :: STMQueueStore q -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)])) getQueueNtfServices st ntfs = do diff --git a/src/Simplex/Messaging/Server/QueueStore/Types.hs b/src/Simplex/Messaging/Server/QueueStore/Types.hs index aee602d52f..61a089efde 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Types.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Types.hs @@ -52,7 +52,8 @@ class StoreQueueClass q => QueueStoreClass q s where deleteStoreQueue :: s -> q -> IO (Either ErrorType QueueRec) getCreateService :: s -> ServiceRec -> IO (Either ErrorType ServiceId) setQueueService :: (PartyI p, ServiceParty p) => s -> q -> SParty p -> Maybe ServiceId -> IO (Either ErrorType ()) - setQueueServices :: (PartyI p, ServiceParty p) => s -> SParty p -> Maybe ServiceId -> [q] -> IO (Set RecipientId) + setRcvQueueServices :: s -> Maybe ServiceId -> [q] -> IO (Set RecipientId) + setNtfQueueServices :: s -> Maybe ServiceId -> [q] -> IO (Set RecipientId) getQueueNtfServices :: s -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)])) getServiceQueueCountHash :: (PartyI p, ServiceParty p) => s -> SParty p -> ServiceId -> IO (Either ErrorType (Int64, IdsHash)) From 462ed7ce8de216b42986c5a0a45a7883d446273b Mon Sep 17 00:00:00 2001 From: "Evgeny @ SimpleX Chat" <259188159+evgeny-simplex@users.noreply.github.com> Date: Thu, 2 Apr 2026 11:25:02 +0000 Subject: [PATCH 3/6] simpler --- src/Simplex/Messaging/Server.hs | 84 ++++++++----------- .../Messaging/Server/MsgStore/Journal.hs | 6 +- .../Messaging/Server/QueueStore/Postgres.hs | 6 +- .../Messaging/Server/QueueStore/STM.hs | 3 +- .../Messaging/Server/QueueStore/Types.hs | 5 +- 5 files changed, 44 insertions(+), 60 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index ed6e1aaab1..6df24e4993 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -1366,41 +1366,37 @@ client labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands" let THandleParams {thVersion} = thParams' clntServiceId = (\THClientService {serviceId} -> serviceId) <$> (peerClientService =<< thAuth thParams') - process prepared t acc@(rs, msgs) = + process batchSubs t acc@(rs, msgs) = (maybe acc (\(!r, !msg_) -> (r : rs, maybe msgs (: msgs) msg_))) - <$> processCommand clntServiceId thVersion prepared t + <$> processCommand clntServiceId thVersion batchSubs t forever $ do batch <- atomically (readTBQueue rcvQ) - prepared <- prepareBatch clntServiceId batch - foldrM (process prepared) ([], []) batch + batchSubs <- prepareBatchSubs clntServiceId batch + foldrM (process batchSubs) ([], []) batch >>= \(rs_, msgs) -> mapM_ (atomically . writeTBQueue sndQ . (,msgs)) (L.nonEmpty rs_) where - prepareBatch :: Maybe ServiceId -> NonEmpty (VerifiedTransmission s) -> M s (Either ErrorType (Map RecipientId (Maybe Message, Maybe (Either ErrorType ())))) - prepareBatch clntServiceId_ batch = do - let (subMsgQs, rcvAssocQs, ntfAssocQs) = foldl' classify ([], [], []) $ L.toList batch - classify (!msgQs, !rcvQs, !ntfQs) = \case + prepareBatchSubs :: + Maybe ServiceId -> + NonEmpty (VerifiedTransmission s) -> + M s (Either ErrorType (Map RecipientId Message, Map RecipientId (Either ErrorType ()), Map RecipientId (Either ErrorType ()))) + prepareBatchSubs clntServiceId_ batch = do + let (subMsgQs, rcvAssocQs, ntfAssocQs) = foldr partitionSubs ([], [], []) batch + partitionSubs t (msgQs, rcvQs, ntfQs) = case t of (Just (q, qr), (_, _, Cmd SRecipient SUB)) | clntServiceId_ /= rcvServiceId qr -> (q : msgQs, q : rcvQs, ntfQs) | otherwise -> (q : msgQs, rcvQs, ntfQs) (Just (q, qr), (_, _, Cmd SNotifier NSUB)) | clntServiceId_ /= (notifier qr >>= ntfServiceId) -> (msgQs, rcvQs, q : ntfQs) - | otherwise -> (msgQs, rcvQs, ntfQs) _ -> (msgQs, rcvQs, ntfQs) liftIO $ runExceptT $ do - msgs <- if null subMsgQs then pure M.empty else tryPeekMsgs ms subMsgQs - rcvUpdated <- if null rcvAssocQs then pure S.empty else ExceptT $ Right <$> setRcvQueueServices @(StoreQueue s) (queueStore ms) clntServiceId_ rcvAssocQs - ntfUpdated <- if null ntfAssocQs then pure S.empty else ExceptT $ Right <$> setNtfQueueServices @(StoreQueue s) (queueStore ms) clntServiceId_ ntfAssocQs - let rcvSet = S.fromList $ map recipientId rcvAssocQs - assocResult rId updated = if S.member rId updated then Right () else Left AUTH - mkEntry q = - let rId = recipientId q - msg_ = M.lookup rId msgs - assoc_ - | S.member rId rcvSet = Just $ assocResult rId rcvUpdated - | otherwise = Nothing - in (rId, (msg_, assoc_)) - mkNtfEntry q = let rId = recipientId q in (rId, (Nothing, Just $ assocResult rId ntfUpdated)) - pure $ M.fromList $ map mkEntry subMsgQs <> map mkNtfEntry ntfAssocQs + rcvAssocs <- ifNotNull rcvAssocQs $ setService SRecipientService clntServiceId_ + ntfAssocs <- ifNotNull ntfAssocQs $ setService SNotifierService clntServiceId_ + msgs <- ifNotNull subMsgQs $ tryPeekMsgs ms + pure (msgs, rcvAssocs, ntfAssocs) + where + ifNotNull qs f = if null qs then pure M.empty else f qs + setService :: (PartyI p, ServiceParty p) => SParty p -> Maybe ServiceId -> [StoreQueue s] -> ExceptT ErrorType IO (Map RecipientId (Either ErrorType ())) + setService party sId = ExceptT . setQueueServices (queueStore ms) party sId processProxiedCmd :: Transmission (Command 'ProxiedClient) -> M s (Maybe ResponseAndMessage) processProxiedCmd (corrId, EntityId sessId, command) = (\t -> ((corrId, EntityId sessId, t), Nothing)) <$$> case command of @@ -1482,8 +1478,8 @@ client mkIncProxyStats ps psOwn own sel = do incStat $ sel ps when own $ incStat $ sel psOwn - processCommand :: Maybe ServiceId -> VersionSMP -> Either ErrorType (Map RecipientId (Maybe Message, Maybe (Either ErrorType ()))) -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage) - processCommand clntServiceId clntVersion prepared (q_, (corrId, entId, cmd)) = case cmd of + processCommand :: Maybe ServiceId -> VersionSMP -> Either ErrorType (Map RecipientId Message, Map RecipientId (Either ErrorType ()), Map RecipientId (Either ErrorType ())) -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage) + processCommand clntServiceId clntVersion batchSubs (q_, (corrId, entId, cmd)) = case cmd of Cmd SProxiedClient command -> processProxiedCmd (corrId, entId, command) Cmd SSender command -> case command of SKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k @@ -1495,10 +1491,8 @@ client LGET -> withQueue $ \q qr -> checkContact qr $ getQueueLink_ q qr Cmd SNotifier NSUB -> response . (corrId,entId,) <$> case q_ of Just (q, QueueRec {notifier = Just ntfCreds}) -> - let assoc_ = case prepared of - Left _ -> Nothing - Right prepMap -> M.lookup entId prepMap >>= snd - in subscribeNotifications assoc_ q ntfCreds + either (pure . ERR) (\_ -> subscribeNotifications q ntfCreds) + $ batchSubs >>= sequence . M.lookup (recipientId q) . \(_, _, n) -> n _ -> pure $ ERR INTERNAL Cmd SNotifierService (NSUBS n idsHash) -> response . (corrId,entId,) <$> case clntServiceId of Just serviceId -> subscribeServiceNotifications serviceId (n, idsHash) @@ -1511,10 +1505,11 @@ client pure $ allowNewQueues && maybe True ((== auth_) . Just) newQueueBasicAuth Cmd SRecipient command -> case command of - SUB -> case prepared of + SUB -> case batchSubs of Left e -> pure $ Just (err e, Nothing) - Right prepMap -> let (msg_, assoc_) = maybe (Nothing, Nothing) id (M.lookup entId prepMap) - in withQueue' $ subscribeQueueAndDeliver msg_ assoc_ + Right (msgs, rcvAssocs, _) -> case sequence $ M.lookup entId rcvAssocs of + Left e -> pure $ Just (err e, Nothing) + Right _ -> withQueue' $ subscribeQueueAndDeliver (M.lookup entId msgs) GET -> withQueue getMessage ACK msgId -> withQueue $ acknowledgeMsg msgId KEY sKey -> withQueue $ \q _ -> either err (corrId,entId,) <$> secureQueue_ q sKey @@ -1655,11 +1650,11 @@ client suspendQueue_ :: (StoreQueue s, QueueRec) -> M s (Transmission BrokerMsg) suspendQueue_ (q, _) = liftIO $ either err (const ok) <$> suspendQueue (queueStore ms) q - subscribeQueueAndDeliver :: Maybe Message -> Maybe (Either ErrorType ()) -> StoreQueue s -> QueueRec -> M s ResponseAndMessage - subscribeQueueAndDeliver msg_ assocResult q qr@QueueRec {rcvServiceId} = + subscribeQueueAndDeliver :: Maybe Message -> StoreQueue s -> QueueRec -> M s ResponseAndMessage + subscribeQueueAndDeliver msg_ q qr@QueueRec {rcvServiceId} = liftIO (TM.lookupIO entId $ subscriptions clnt) >>= \case Nothing -> - sharedSubscribeQueue assocResult q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices >>= \case + sharedSubscribeQueue q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices >>= \case Left e -> pure (err e, Nothing) Right s -> deliver s Just s@Sub {subThread} -> do @@ -1761,9 +1756,9 @@ client then action q qr else liftIO (updateQueueTime (queueStore ms) q t) >>= either (pure . err') (action q) - subscribeNotifications :: Maybe (Either ErrorType ()) -> StoreQueue s -> NtfCreds -> M s BrokerMsg - subscribeNotifications assocResult q NtfCreds {ntfServiceId} = - sharedSubscribeQueue assocResult q SNotifierService ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure ()) ntfServices >>= \case + subscribeNotifications :: StoreQueue s -> NtfCreds -> M s BrokerMsg + subscribeNotifications q NtfCreds {ntfServiceId} = + sharedSubscribeQueue q SNotifierService ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure ()) ntfServices >>= \case Left e -> pure $ ERR e Right (hasSub, _) -> do when (isNothing clntServiceId) $ @@ -1772,7 +1767,6 @@ client sharedSubscribeQueue :: (PartyI p, ServiceParty p) => - Maybe (Either ErrorType ()) -> StoreQueue s -> SParty p -> Maybe ServiceId -> @@ -1782,7 +1776,7 @@ client STM sub -> (ServerStats -> ServiceStats) -> M s (Either ErrorType (Bool, Maybe sub)) - sharedSubscribeQueue assocResult q party queueServiceId srvSubscribers clientSubs clientServiceSubs mkSub servicesSel = do + sharedSubscribeQueue q party queueServiceId srvSubscribers clientSubs clientServiceSubs mkSub servicesSel = do stats <- asks serverStats let incSrvStat sel = incStat $ sel $ servicesSel stats writeSub = writeTQueue (subQ srvSubscribers) (CSClient entId queueServiceId clntServiceId, clientId) @@ -1797,9 +1791,7 @@ client incSrvStat srvSubQueues incSrvStat srvAssocDuplicate pure $ Right (hasSub, Nothing) - | otherwise -> case assocResult of - Just (Left e) -> pure $ Left e - _ -> runExceptT $ do + | otherwise -> runExceptT $ do -- association already done in prepareBatch hasSub <- atomically $ (<$ incServiceQueueSubs) =<< hasServiceSub atomically writeSub @@ -1813,9 +1805,7 @@ client -- This function is used when queue association with the service is created. incServiceQueueSubs = modifyTVar' (clientServiceSubs clnt) $ addServiceSubs (1, queueIdHash (recipientId q)) -- service count and IDS hash Nothing -> case queueServiceId of - Just _ -> case assocResult of - Just (Left e) -> pure $ Left e - _ -> runExceptT $ do + Just _ -> runExceptT $ do -- unassociation already done in prepareBatch liftIO $ incSrvStat srvAssocRemoved -- getSubscription may be Just for receiving service, where clientSubs also hold active deliveries for service subscriptions. @@ -2125,7 +2115,7 @@ client -- rejectOrVerify filters allowed commands, no need to repeat it here. -- INTERNAL is used because processCommand never returns Nothing for sender commands (could be extracted for better types). -- `fst` removes empty message that is only returned for `SUB` command - Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion (Right M.empty) t'') + Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion (Right (M.empty, M.empty, M.empty)) t'') -- encode response r' <- case batchTransmissions clntTHParams [Right (Nothing, encodeTransmission clntTHParams r)] of [] -> throwE INTERNAL -- at least 1 item is guaranteed from NonEmpty/Right diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index a75e31ad95..1707038248 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -353,10 +353,8 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where {-# INLINE getCreateService #-} setQueueService = withQS setQueueService {-# INLINE setQueueService #-} - setRcvQueueServices st = withQS (\qs -> setRcvQueueServices qs) st - {-# INLINE setRcvQueueServices #-} - setNtfQueueServices st = withQS (\qs -> setNtfQueueServices qs) st - {-# INLINE setNtfQueueServices #-} + setQueueServices st = withQS (\qs -> setQueueServices qs) st + {-# INLINE setQueueServices #-} getQueueNtfServices = withQS (getQueueNtfServices @(JournalQueue s)) {-# INLINE getQueueNtfServices #-} getServiceQueueCountHash = withQS (getServiceQueueCountHash @(JournalQueue s)) diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs index 85da0feb4c..5e2f1492b9 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs @@ -504,10 +504,8 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where atomically $ writeTVar (queueRec sq) $ Just q' withLog "setQueueService" st $ \sl -> logQueueService sl rId party serviceId - setRcvQueueServices _ _ [] = pure S.empty - setRcvQueueServices _ _ _ = pure S.empty -- TODO batch implementation - setNtfQueueServices _ _ [] = pure S.empty - setNtfQueueServices _ _ _ = pure S.empty -- TODO batch implementation + setQueueServices _ _ _ [] = pure $ Right M.empty + setQueueServices _ _ _ _ = pure $ Right M.empty -- TODO batch implementation getQueueNtfServices :: PostgresQueueStore q -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)])) getQueueNtfServices st ntfs = E.uninterruptibleMask_ $ runExceptT $ do diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index 295fae7e6f..cb6400f177 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -337,8 +337,7 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where mapM_ (removeServiceQueue st serviceSel qId) prevSrvId mapM_ (addServiceQueue st serviceSel qId) serviceId - setRcvQueueServices _ _ _ = pure S.empty -- TODO loop implementation - setNtfQueueServices _ _ _ = pure S.empty -- TODO loop implementation + setQueueServices _ _ _ _ = pure $ Right M.empty -- TODO loop implementation getQueueNtfServices :: STMQueueStore q -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)])) getQueueNtfServices st ntfs = do diff --git a/src/Simplex/Messaging/Server/QueueStore/Types.hs b/src/Simplex/Messaging/Server/QueueStore/Types.hs index 61a089efde..415a5f33cb 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Types.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Types.hs @@ -16,7 +16,7 @@ import Control.Concurrent.STM import Control.Monad import Data.Int (Int64) import Data.List.NonEmpty (NonEmpty) -import Data.Set (Set) +import Data.Map.Strict (Map) import Data.Text (Text) import Simplex.Messaging.Protocol import Simplex.Messaging.Server.QueueStore @@ -52,8 +52,7 @@ class StoreQueueClass q => QueueStoreClass q s where deleteStoreQueue :: s -> q -> IO (Either ErrorType QueueRec) getCreateService :: s -> ServiceRec -> IO (Either ErrorType ServiceId) setQueueService :: (PartyI p, ServiceParty p) => s -> q -> SParty p -> Maybe ServiceId -> IO (Either ErrorType ()) - setRcvQueueServices :: s -> Maybe ServiceId -> [q] -> IO (Set RecipientId) - setNtfQueueServices :: s -> Maybe ServiceId -> [q] -> IO (Set RecipientId) + setQueueServices :: (PartyI p, ServiceParty p) => s -> SParty p -> Maybe ServiceId -> [q] -> IO (Either ErrorType (Map RecipientId (Either ErrorType ()))) getQueueNtfServices :: s -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)])) getServiceQueueCountHash :: (PartyI p, ServiceParty p) => s -> SParty p -> ServiceId -> IO (Either ErrorType (Int64, IdsHash)) From 398c7e30c5d6f7725d8b7d90589d2096fb551c8e Mon Sep 17 00:00:00 2001 From: "Evgeny @ SimpleX Chat" <259188159+evgeny-simplex@users.noreply.github.com> Date: Thu, 2 Apr 2026 16:46:51 +0000 Subject: [PATCH 4/6] batch assoc functions --- .../Messaging/Server/MsgStore/Journal.hs | 2 +- .../Messaging/Server/QueueStore/Postgres.hs | 24 ++++++++++++++++++- .../Messaging/Server/QueueStore/STM.hs | 4 +++- 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 1707038248..185c113b78 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -353,7 +353,7 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where {-# INLINE getCreateService #-} setQueueService = withQS setQueueService {-# INLINE setQueueService #-} - setQueueServices st = withQS (\qs -> setQueueServices qs) st + setQueueServices = withQS setQueueServices {-# INLINE setQueueServices #-} getQueueNtfServices = withQS (getQueueNtfServices @(JournalQueue s)) {-# INLINE getQueueNtfServices #-} diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs index 5e2f1492b9..318191c7cc 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs @@ -505,7 +505,29 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where withLog "setQueueService" st $ \sl -> logQueueService sl rId party serviceId setQueueServices _ _ _ [] = pure $ Right M.empty - setQueueServices _ _ _ _ = pure $ Right M.empty -- TODO batch implementation + setQueueServices st party serviceId qs = E.uninterruptibleMask_ $ runExceptT $ do + updated <- S.fromList <$> withDB' "setQueueServices" st (\db -> + map fromOnly <$> DB.query db updateQuery (serviceId, In (map recipientId qs))) + results <- liftIO $ forM qs $ \sq -> do + let rId = recipientId sq + (rId,) <$> if S.member rId updated + then readQueueRecIO (queueRec sq) $>>= \q -> do + atomically $ writeTVar (queueRec sq) $ Just $ updateRec q + withLog "setQueueServices" st $ \sl -> logQueueService sl rId party serviceId + pure $ Right () + else pure $ Left AUTH + pure $ M.fromList results + where + updateQuery = case party of + SRecipientService -> + "UPDATE msg_queues SET rcv_service_id = ? WHERE recipient_id IN ? AND deleted_at IS NULL RETURNING recipient_id" + SNotifierService -> + "UPDATE msg_queues SET ntf_service_id = ? WHERE recipient_id IN ? AND notifier_id IS NOT NULL AND deleted_at IS NULL RETURNING recipient_id" + updateRec q = case party of + SRecipientService -> q {rcvServiceId = serviceId} + SNotifierService -> case notifier q of + Just nc -> q {notifier = Just nc {ntfServiceId = serviceId}} + Nothing -> q getQueueNtfServices :: PostgresQueueStore q -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)])) getQueueNtfServices st ntfs = E.uninterruptibleMask_ $ runExceptT $ do diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index cb6400f177..3fad426a3c 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -337,7 +337,9 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where mapM_ (removeServiceQueue st serviceSel qId) prevSrvId mapM_ (addServiceQueue st serviceSel qId) serviceId - setQueueServices _ _ _ _ = pure $ Right M.empty -- TODO loop implementation + setQueueServices st party serviceId qs = Right . M.fromList <$> mapM setOne qs + where + setOne sq = (recipientId sq,) <$> setQueueService st sq party serviceId getQueueNtfServices :: STMQueueStore q -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)])) getQueueNtfServices st ntfs = do From 4e8fa403c8eeb27541ddfbfaab33d3cb312555fd Mon Sep 17 00:00:00 2001 From: "Evgeny @ SimpleX Chat" <259188159+evgeny-simplex@users.noreply.github.com> Date: Thu, 2 Apr 2026 17:22:32 +0000 Subject: [PATCH 5/6] clean up --- src/Simplex/Messaging/Server.hs | 55 +++++++++---------- .../Messaging/Server/QueueStore/STM.hs | 4 +- 2 files changed, 27 insertions(+), 32 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 6df24e4993..92fa1de701 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -1654,9 +1654,7 @@ client subscribeQueueAndDeliver msg_ q qr@QueueRec {rcvServiceId} = liftIO (TM.lookupIO entId $ subscriptions clnt) >>= \case Nothing -> - sharedSubscribeQueue q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices >>= \case - Left e -> pure (err e, Nothing) - Right s -> deliver s + deliver =<< sharedSubscribeQueue q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices Just s@Sub {subThread} -> do stats <- asks serverStats case subThread of @@ -1757,13 +1755,11 @@ client else liftIO (updateQueueTime (queueStore ms) q t) >>= either (pure . err') (action q) subscribeNotifications :: StoreQueue s -> NtfCreds -> M s BrokerMsg - subscribeNotifications q NtfCreds {ntfServiceId} = - sharedSubscribeQueue q SNotifierService ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure ()) ntfServices >>= \case - Left e -> pure $ ERR e - Right (hasSub, _) -> do - when (isNothing clntServiceId) $ - asks serverStats >>= incStat . (if hasSub then ntfSubDuplicate else ntfSub) - pure $ SOK clntServiceId + subscribeNotifications q NtfCreds {ntfServiceId} = do + (hasSub, _) <- sharedSubscribeQueue q SNotifierService ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure ()) ntfServices + when (isNothing clntServiceId) $ + asks serverStats >>= incStat . (if hasSub then ntfSubDuplicate else ntfSub) + pure $ SOK clntServiceId sharedSubscribeQueue :: (PartyI p, ServiceParty p) => @@ -1775,7 +1771,7 @@ client (Client s -> TVar (Int64, IdsHash)) -> STM sub -> (ServerStats -> ServiceStats) -> - M s (Either ErrorType (Bool, Maybe sub)) + M s (Bool, Maybe sub) sharedSubscribeQueue q party queueServiceId srvSubscribers clientSubs clientServiceSubs mkSub servicesSel = do stats <- asks serverStats let incSrvStat sel = incStat $ sel $ servicesSel stats @@ -1790,33 +1786,32 @@ client incSrvStat srvSubCount incSrvStat srvSubQueues incSrvStat srvAssocDuplicate - pure $ Right (hasSub, Nothing) - | otherwise -> runExceptT $ do - -- association already done in prepareBatch - hasSub <- atomically $ (<$ incServiceQueueSubs) =<< hasServiceSub - atomically writeSub - liftIO $ do - unless hasSub $ incSrvStat srvSubCount - incSrvStat srvSubQueues - incSrvStat $ maybe srvAssocNew (const srvAssocUpdated) queueServiceId - pure (hasSub, Nothing) + pure (hasSub, Nothing) + | otherwise -> do + -- association already done in prepareBatchSubs + hasSub <- atomically $ (<$ incServiceQueueSubs) =<< hasServiceSub + atomically writeSub + unless hasSub $ incSrvStat srvSubCount + incSrvStat srvSubQueues + incSrvStat $ maybe srvAssocNew (const srvAssocUpdated) queueServiceId + pure (hasSub, Nothing) where hasServiceSub = ((0 /=) . fst) <$> readTVar (clientServiceSubs clnt) -- This function is used when queue association with the service is created. incServiceQueueSubs = modifyTVar' (clientServiceSubs clnt) $ addServiceSubs (1, queueIdHash (recipientId q)) -- service count and IDS hash Nothing -> case queueServiceId of - Just _ -> runExceptT $ do - -- unassociation already done in prepareBatch - liftIO $ incSrvStat srvAssocRemoved - -- getSubscription may be Just for receiving service, where clientSubs also hold active deliveries for service subscriptions. - -- For notification service it can only be Just if storage and session states diverge. - r <- atomically $ getSubscription >>= newSub - atomically writeSub - pure r + Just _ -> do + -- unassociation already done in prepareBatchSubs + incSrvStat srvAssocRemoved + -- getSubscription may be Just for receiving service, where clientSubs also hold active deliveries for service subscriptions. + -- For notification service it can only be Just if storage and session states diverge. + r <- atomically $ getSubscription >>= newSub + atomically writeSub + pure r Nothing -> do r@(hasSub, _) <- atomically $ getSubscription >>= newSub unless hasSub $ atomically writeSub - pure $ Right r + pure r where getSubscription = TM.lookup entId $ clientSubs clnt newSub = \case diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index 3fad426a3c..583bbf3384 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -337,9 +337,9 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where mapM_ (removeServiceQueue st serviceSel qId) prevSrvId mapM_ (addServiceQueue st serviceSel qId) serviceId - setQueueServices st party serviceId qs = Right . M.fromList <$> mapM setOne qs + setQueueServices st party serviceId qs = Right . M.fromList <$> mapM setQueue qs where - setOne sq = (recipientId sq,) <$> setQueueService st sq party serviceId + setQueue sq = (recipientId sq,) <$> setQueueService st sq party serviceId getQueueNtfServices :: STMQueueStore q -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)])) getQueueNtfServices st ntfs = do From 7c478a783e2eac39cd4cd743960a1842b15ded88 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Thu, 2 Apr 2026 18:30:16 +0100 Subject: [PATCH 6/6] fix --- src/Simplex/Messaging/Server.hs | 16 ++++++---------- .../Messaging/Server/QueueStore/Postgres.hs | 3 ++- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 92fa1de701..1b7d920ac5 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -1492,7 +1492,7 @@ client Cmd SNotifier NSUB -> response . (corrId,entId,) <$> case q_ of Just (q, QueueRec {notifier = Just ntfCreds}) -> either (pure . ERR) (\_ -> subscribeNotifications q ntfCreds) - $ batchSubs >>= sequence . M.lookup (recipientId q) . \(_, _, n) -> n + $ batchSubs >>= \(_, _, ntfAssocs) -> sequence (M.lookup (recipientId q) ntfAssocs) _ -> pure $ ERR INTERNAL Cmd SNotifierService (NSUBS n idsHash) -> response . (corrId,entId,) <$> case clntServiceId of Just serviceId -> subscribeServiceNotifications serviceId (n, idsHash) @@ -1505,11 +1505,9 @@ client pure $ allowNewQueues && maybe True ((== auth_) . Just) newQueueBasicAuth Cmd SRecipient command -> case command of - SUB -> case batchSubs of + SUB -> case batchSubs >>= \(msgs, rcvAssocs, _) -> sequence (M.lookup entId rcvAssocs) $> msgs of Left e -> pure $ Just (err e, Nothing) - Right (msgs, rcvAssocs, _) -> case sequence $ M.lookup entId rcvAssocs of - Left e -> pure $ Just (err e, Nothing) - Right _ -> withQueue' $ subscribeQueueAndDeliver (M.lookup entId msgs) + Right msgs -> withQueue' $ subscribeQueueAndDeliver $ M.lookup entId msgs GET -> withQueue getMessage ACK msgId -> withQueue $ acknowledgeMsg msgId KEY sKey -> withQueue $ \q _ -> either err (corrId,entId,) <$> secureQueue_ q sKey @@ -1654,7 +1652,7 @@ client subscribeQueueAndDeliver msg_ q qr@QueueRec {rcvServiceId} = liftIO (TM.lookupIO entId $ subscriptions clnt) >>= \case Nothing -> - deliver =<< sharedSubscribeQueue q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices + deliver =<< sharedSubscribeQueue q rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices Just s@Sub {subThread} -> do stats <- asks serverStats case subThread of @@ -1756,15 +1754,13 @@ client subscribeNotifications :: StoreQueue s -> NtfCreds -> M s BrokerMsg subscribeNotifications q NtfCreds {ntfServiceId} = do - (hasSub, _) <- sharedSubscribeQueue q SNotifierService ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure ()) ntfServices + (hasSub, _) <- sharedSubscribeQueue q ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure ()) ntfServices when (isNothing clntServiceId) $ asks serverStats >>= incStat . (if hasSub then ntfSubDuplicate else ntfSub) pure $ SOK clntServiceId sharedSubscribeQueue :: - (PartyI p, ServiceParty p) => StoreQueue s -> - SParty p -> Maybe ServiceId -> ServerSubscribers s -> (Client s -> TMap QueueId sub) -> @@ -1772,7 +1768,7 @@ client STM sub -> (ServerStats -> ServiceStats) -> M s (Bool, Maybe sub) - sharedSubscribeQueue q party queueServiceId srvSubscribers clientSubs clientServiceSubs mkSub servicesSel = do + sharedSubscribeQueue q queueServiceId srvSubscribers clientSubs clientServiceSubs mkSub servicesSel = do stats <- asks serverStats let incSrvStat sel = incStat $ sel $ servicesSel stats writeSub = writeTQueue (subQ srvSubscribers) (CSClient entId queueServiceId clntServiceId, clientId) diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs index 318191c7cc..ce1cf25db4 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs @@ -91,7 +91,7 @@ import Simplex.Messaging.SystemTime import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (SMPServiceRole (..)) -import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, maybeFirstRow', tshow, (<$$>)) +import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, maybeFirstRow', tshow, (<$$>), ($>>=)) import System.Exit (exitFailure) import System.IO (IOMode (..), hFlush, stdout) import UnliftIO.STM @@ -504,6 +504,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where atomically $ writeTVar (queueRec sq) $ Just q' withLog "setQueueService" st $ \sl -> logQueueService sl rId party serviceId + setQueueServices :: (PartyI p, ServiceParty p) => PostgresQueueStore q -> SParty p -> Maybe ServiceId -> [q] -> IO (Either ErrorType (M.Map RecipientId (Either ErrorType ()))) setQueueServices _ _ _ [] = pure $ Right M.empty setQueueServices st party serviceId qs = E.uninterruptibleMask_ $ runExceptT $ do updated <- S.fromList <$> withDB' "setQueueServices" st (\db ->