Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions plans/20260401_01_batch_queue_associations.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Server: batch queue service associations

When a batch of SUB or NSUB commands arrives from a service client, each command that needs a new or removed service association calls `setQueueService` individually - one DB write per command. For 135 commands per batch, that's 135 individual `UPDATE msg_queues` queries.

## Goal

Reduce to at most 2 DB queries per batch (one for rcv associations, one for ntf associations), using `UPDATE ... RETURNING recipient_id` to identify which queues were actually updated.

Also fuse message pre-fetch and association batching into a single batch preparation step with a clean contract.

## Contract

```haskell
prepareBatch :: Maybe ServiceId -> NonEmpty (VerifiedTransmission s) -> M s (Either ErrorType (Map RecipientId (Maybe Message, Maybe (Either ErrorType ()))))
```

`Left e` = batch-level failure (message pre-fetch or association query failed entirely). All SUBs/NSUBs in the batch get this error.

`Right map` = per-queue results as a tuple:
- `Maybe Message` - pre-fetched message for SUB queues, `Nothing` for NSUB or no message
- `Maybe (Either ErrorType ())` - association result. `Nothing` = no update needed. `Just (Right ())` = update succeeded. `Just (Left e)` = update failed for this queue.

One map, one lookup per queue. `processCommand` passes both values to `subscribeQueueAndDeliver` / `subscribeNotifications` -> `sharedSubscribeQueue`.

Queues not in the map (non-SUB/NSUB commands, failed verification) are not affected.

## prepareBatch implementation

One accumulating fold over the batch, collecting three lists:
- `subMsgQs :: [StoreQueue s]` - SUB queues for message pre-fetch
- `rcvAssocQs :: [StoreQueue s]` - SUB queues needing `rcv_service_id` update (`clntServiceId /= rcvServiceId qr`)
- `ntfAssocQs :: [StoreQueue s]` - NSUB queues needing `ntf_service_id` update (`clntServiceId /= ntfServiceId` from `NtfCreds`)

Classification reads from the already-loaded `QueueRec` in `VerifiedTransmission` - no extra DB query.

Then three store calls (each skipped if its list is empty):
1. `tryPeekMsgs ms subMsgQs` -> `Map RecipientId Message`
2. `setRcvQueueServices (queueStore ms) clntServiceId rcvAssocQs` -> `Set RecipientId`
3. `setNtfQueueServices (queueStore ms) clntServiceId ntfAssocQs` -> `Set RecipientId`

Then one pass to merge results into `Map RecipientId (Maybe Message, Maybe (Either ErrorType ()))`:
- For each SUB queue: `(M.lookup rId msgMap, assocResult rId rcvUpdated rcvAssocQs)`
- For each NSUB queue: `(Nothing, assocResult rId ntfUpdated ntfAssocQs)`

Where `assocResult rId updated assocQs` = if the queue was in `assocQs` (needed update), then `Just (Right ())` if `rId` is in `updated`, else `Just (Left AUTH)`. If not in `assocQs` (no update needed), `Nothing`.

If any of the three calls fails entirely, return `Left e`.

## Store interface

Replace the polymorphic `setQueueServices` with two plain functions in `QueueStoreClass`:

```haskell
setRcvQueueServices :: s -> Maybe ServiceId -> [q] -> IO (Set RecipientId)
setNtfQueueServices :: s -> Maybe ServiceId -> [q] -> IO (Set RecipientId)
```

No `SParty p` polymorphism. Each function knows its column.

### Postgres implementation

`setRcvQueueServices`:
```sql
UPDATE msg_queues SET rcv_service_id = ?
WHERE recipient_id IN ? AND deleted_at IS NULL
RETURNING recipient_id
```

`setNtfQueueServices`:
```sql
UPDATE msg_queues SET ntf_service_id = ?
WHERE recipient_id IN ? AND notifier_id IS NOT NULL AND deleted_at IS NULL
RETURNING recipient_id
```

After each batch query, for each queue in the returned set:
1. Read QueueRec TVar, update with new serviceId
2. Write store log entry

### STM implementation

Loop over queues, call existing per-item logic, collect succeeded `RecipientId`s into a Set.

## Downstream changes in Server.hs

### processCommand

Gains one parameter: `Map RecipientId (Maybe Message, Maybe (Either ErrorType ()))`.

SUB case: `M.lookup entId prepared` gives `Just (msg_, assocResult)` or `Nothing`. Pass both to `subscribeQueueAndDeliver`.

NSUB case: `M.lookup entId prepared` gives `Just (Nothing, assocResult)` or `Nothing`. Pass `assocResult` to `subscribeNotifications`.

Forwarded commands: pass `M.empty`.

### subscribeQueueAndDeliver

Takes `Maybe Message` and `Maybe (Either ErrorType ())` as before. No change in how it uses them.

### sharedSubscribeQueue

Takes `Maybe (Either ErrorType ())`. On paths needing association update:
- `Just (Left e)` -> return error
- `Just (Right ())` -> skip `setQueueService`, proceed with STM work
- `Nothing` -> no update needed, proceed with existing logic

## Implementation order (top-down)

1. Define the `prepareBatch` contract and thread one map through `processCommand` -> `subscribeQueueAndDeliver` / `subscribeNotifications` -> `sharedSubscribeQueue` (Server.hs)
2. Implement `prepareBatch` with the fold, three calls, and merge (Server.hs)
3. Add `setRcvQueueServices` and `setNtfQueueServices` to `QueueStoreClass` (Types.hs)
4. Implement for Postgres with batch `UPDATE ... RETURNING` (Postgres.hs)
5. Implement for STM as loop (STM.hs)
6. Implement for Journal as delegation (Journal.hs)

At step 2, store functions can initially be stubs returning empty sets. Steps 3-6 fill in the real implementations.

## Files changed

| File | Change |
|---|---|
| `src/Simplex/Messaging/Server.hs` | `prepareBatch` with fold + merge; one map parameter through `processCommand` -> `subscribeQueueAndDeliver` / `subscribeNotifications` -> `sharedSubscribeQueue` |
| `src/Simplex/Messaging/Server/QueueStore/Types.hs` | Add `setRcvQueueServices`, `setNtfQueueServices` to `QueueStoreClass` |
| `src/Simplex/Messaging/Server/QueueStore/Postgres.hs` | Implement with batch `UPDATE ... RETURNING` + per-item TVar/log updates |
| `src/Simplex/Messaging/Server/QueueStore/STM.hs` | Implement as loop |
| `src/Simplex/Messaging/Server/MsgStore/Journal.hs` | Delegate to underlying store |
94 changes: 53 additions & 41 deletions src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1366,19 +1366,37 @@ client
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands"
let THandleParams {thVersion} = thParams'
clntServiceId = (\THClientService {serviceId} -> serviceId) <$> (peerClientService =<< thAuth thParams')
process msgMap t acc@(rs, msgs) =
process batchSubs t acc@(rs, msgs) =
(maybe acc (\(!r, !msg_) -> (r : rs, maybe msgs (: msgs) msg_)))
<$> processCommand clntServiceId thVersion msgMap t
<$> processCommand clntServiceId thVersion batchSubs t
forever $ do
batch <- atomically (readTBQueue rcvQ)
msgMap <- prefetchMsgs batch
foldrM (process msgMap) ([], []) batch
batchSubs <- prepareBatchSubs clntServiceId batch
foldrM (process batchSubs) ([], []) batch
>>= \(rs_, msgs) -> mapM_ (atomically . writeTBQueue sndQ . (,msgs)) (L.nonEmpty rs_)
where
prefetchMsgs :: NonEmpty (VerifiedTransmission s) -> M s (Either ErrorType (Map RecipientId Message))
prefetchMsgs batch =
let subQs = [q | (Just (q, _), (_, _, Cmd SRecipient SUB)) <- L.toList batch]
in if null subQs then pure $ Right M.empty else liftIO $ runExceptT $ tryPeekMsgs ms subQs
prepareBatchSubs ::
Maybe ServiceId ->
NonEmpty (VerifiedTransmission s) ->
M s (Either ErrorType (Map RecipientId Message, Map RecipientId (Either ErrorType ()), Map RecipientId (Either ErrorType ())))
prepareBatchSubs clntServiceId_ batch = do
let (subMsgQs, rcvAssocQs, ntfAssocQs) = foldr partitionSubs ([], [], []) batch
partitionSubs t (msgQs, rcvQs, ntfQs) = case t of
(Just (q, qr), (_, _, Cmd SRecipient SUB))
| clntServiceId_ /= rcvServiceId qr -> (q : msgQs, q : rcvQs, ntfQs)
| otherwise -> (q : msgQs, rcvQs, ntfQs)
(Just (q, qr), (_, _, Cmd SNotifier NSUB))
| clntServiceId_ /= (notifier qr >>= ntfServiceId) -> (msgQs, rcvQs, q : ntfQs)
_ -> (msgQs, rcvQs, ntfQs)
liftIO $ runExceptT $ do
rcvAssocs <- ifNotNull rcvAssocQs $ setService SRecipientService clntServiceId_
ntfAssocs <- ifNotNull ntfAssocQs $ setService SNotifierService clntServiceId_
msgs <- ifNotNull subMsgQs $ tryPeekMsgs ms
pure (msgs, rcvAssocs, ntfAssocs)
where
ifNotNull qs f = if null qs then pure M.empty else f qs
setService :: (PartyI p, ServiceParty p) => SParty p -> Maybe ServiceId -> [StoreQueue s] -> ExceptT ErrorType IO (Map RecipientId (Either ErrorType ()))
setService party sId = ExceptT . setQueueServices (queueStore ms) party sId

processProxiedCmd :: Transmission (Command 'ProxiedClient) -> M s (Maybe ResponseAndMessage)
processProxiedCmd (corrId, EntityId sessId, command) = (\t -> ((corrId, EntityId sessId, t), Nothing)) <$$> case command of
Expand Down Expand Up @@ -1460,8 +1478,8 @@ client
mkIncProxyStats ps psOwn own sel = do
incStat $ sel ps
when own $ incStat $ sel psOwn
processCommand :: Maybe ServiceId -> VersionSMP -> Either ErrorType (Map RecipientId Message) -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage)
processCommand clntServiceId clntVersion msgMap (q_, (corrId, entId, cmd)) = case cmd of
processCommand :: Maybe ServiceId -> VersionSMP -> Either ErrorType (Map RecipientId Message, Map RecipientId (Either ErrorType ()), Map RecipientId (Either ErrorType ())) -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage)
processCommand clntServiceId clntVersion batchSubs (q_, (corrId, entId, cmd)) = case cmd of
Cmd SProxiedClient command -> processProxiedCmd (corrId, entId, command)
Cmd SSender command -> case command of
SKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k
Expand All @@ -1472,7 +1490,9 @@ client
LKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k $>> getQueueLink_ q qr
LGET -> withQueue $ \q qr -> checkContact qr $ getQueueLink_ q qr
Cmd SNotifier NSUB -> response . (corrId,entId,) <$> case q_ of
Just (q, QueueRec {notifier = Just ntfCreds}) -> subscribeNotifications q ntfCreds
Just (q, QueueRec {notifier = Just ntfCreds}) ->
either (pure . ERR) (\_ -> subscribeNotifications q ntfCreds)
$ batchSubs >>= \(_, _, ntfAssocs) -> sequence (M.lookup (recipientId q) ntfAssocs)
_ -> pure $ ERR INTERNAL
Cmd SNotifierService (NSUBS n idsHash) -> response . (corrId,entId,) <$> case clntServiceId of
Just serviceId -> subscribeServiceNotifications serviceId (n, idsHash)
Expand All @@ -1485,9 +1505,9 @@ client
pure $ allowNewQueues && maybe True ((== auth_) . Just) newQueueBasicAuth
Cmd SRecipient command ->
case command of
SUB -> case msgMap of
SUB -> case batchSubs >>= \(msgs, rcvAssocs, _) -> sequence (M.lookup entId rcvAssocs) $> msgs of
Left e -> pure $ Just (err e, Nothing)
Right msgs -> withQueue' $ subscribeQueueAndDeliver (M.lookup entId msgs)
Right msgs -> withQueue' $ subscribeQueueAndDeliver $ M.lookup entId msgs
GET -> withQueue getMessage
ACK msgId -> withQueue $ acknowledgeMsg msgId
KEY sKey -> withQueue $ \q _ -> either err (corrId,entId,) <$> secureQueue_ q sKey
Expand Down Expand Up @@ -1632,9 +1652,7 @@ client
subscribeQueueAndDeliver msg_ q qr@QueueRec {rcvServiceId} =
liftIO (TM.lookupIO entId $ subscriptions clnt) >>= \case
Nothing ->
sharedSubscribeQueue q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices >>= \case
Left e -> pure (err e, Nothing)
Right s -> deliver s
deliver =<< sharedSubscribeQueue q rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices
Just s@Sub {subThread} -> do
stats <- asks serverStats
case subThread of
Expand Down Expand Up @@ -1735,26 +1753,22 @@ client
else liftIO (updateQueueTime (queueStore ms) q t) >>= either (pure . err') (action q)

subscribeNotifications :: StoreQueue s -> NtfCreds -> M s BrokerMsg
subscribeNotifications q NtfCreds {ntfServiceId} =
sharedSubscribeQueue q SNotifierService ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure ()) ntfServices >>= \case
Left e -> pure $ ERR e
Right (hasSub, _) -> do
when (isNothing clntServiceId) $
asks serverStats >>= incStat . (if hasSub then ntfSubDuplicate else ntfSub)
pure $ SOK clntServiceId
subscribeNotifications q NtfCreds {ntfServiceId} = do
(hasSub, _) <- sharedSubscribeQueue q ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure ()) ntfServices
when (isNothing clntServiceId) $
asks serverStats >>= incStat . (if hasSub then ntfSubDuplicate else ntfSub)
pure $ SOK clntServiceId

sharedSubscribeQueue ::
(PartyI p, ServiceParty p) =>
StoreQueue s ->
SParty p ->
Maybe ServiceId ->
ServerSubscribers s ->
(Client s -> TMap QueueId sub) ->
(Client s -> TVar (Int64, IdsHash)) ->
STM sub ->
(ServerStats -> ServiceStats) ->
M s (Either ErrorType (Bool, Maybe sub))
sharedSubscribeQueue q party queueServiceId srvSubscribers clientSubs clientServiceSubs mkSub servicesSel = do
M s (Bool, Maybe sub)
sharedSubscribeQueue q queueServiceId srvSubscribers clientSubs clientServiceSubs mkSub servicesSel = do
stats <- asks serverStats
let incSrvStat sel = incStat $ sel $ servicesSel stats
writeSub = writeTQueue (subQ srvSubscribers) (CSClient entId queueServiceId clntServiceId, clientId)
Expand All @@ -1768,25 +1782,23 @@ client
incSrvStat srvSubCount
incSrvStat srvSubQueues
incSrvStat srvAssocDuplicate
pure $ Right (hasSub, Nothing)
| otherwise -> runExceptT $ do
-- new or updated queue-service association
ExceptT $ setQueueService (queueStore ms) q party (Just serviceId)
pure (hasSub, Nothing)
| otherwise -> do
-- association already done in prepareBatchSubs
hasSub <- atomically $ (<$ incServiceQueueSubs) =<< hasServiceSub
atomically writeSub
liftIO $ do
unless hasSub $ incSrvStat srvSubCount
incSrvStat srvSubQueues
incSrvStat $ maybe srvAssocNew (const srvAssocUpdated) queueServiceId
unless hasSub $ incSrvStat srvSubCount
incSrvStat srvSubQueues
incSrvStat $ maybe srvAssocNew (const srvAssocUpdated) queueServiceId
pure (hasSub, Nothing)
where
hasServiceSub = ((0 /=) . fst) <$> readTVar (clientServiceSubs clnt)
-- This function is used when queue association with the service is created.
incServiceQueueSubs = modifyTVar' (clientServiceSubs clnt) $ addServiceSubs (1, queueIdHash (recipientId q)) -- service count and IDs hash
incServiceQueueSubs = modifyTVar' (clientServiceSubs clnt) $ addServiceSubs (1, queueIdHash (recipientId q)) -- service count and IDS hash
Nothing -> case queueServiceId of
Just _ -> runExceptT $ do
ExceptT $ setQueueService (queueStore ms) q party Nothing
liftIO $ incSrvStat srvAssocRemoved
Just _ -> do
-- unassociation already done in prepareBatchSubs
incSrvStat srvAssocRemoved
-- getSubscription may be Just for receiving service, where clientSubs also hold active deliveries for service subscriptions.
-- For notification service it can only be Just if storage and session states diverge.
r <- atomically $ getSubscription >>= newSub
Expand All @@ -1795,7 +1807,7 @@ client
Nothing -> do
r@(hasSub, _) <- atomically $ getSubscription >>= newSub
unless hasSub $ atomically writeSub
pure $ Right r
pure r
where
getSubscription = TM.lookup entId $ clientSubs clnt
newSub = \case
Expand Down Expand Up @@ -2094,7 +2106,7 @@ client
-- rejectOrVerify filters allowed commands, no need to repeat it here.
-- INTERNAL is used because processCommand never returns Nothing for sender commands (could be extracted for better types).
-- `fst` removes empty message that is only returned for `SUB` command
Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion (Right M.empty) t'')
Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion (Right (M.empty, M.empty, M.empty)) t'')
-- encode response
r' <- case batchTransmissions clntTHParams [Right (Nothing, encodeTransmission clntTHParams r)] of
[] -> throwE INTERNAL -- at least 1 item is guaranteed from NonEmpty/Right
Expand Down
2 changes: 2 additions & 0 deletions src/Simplex/Messaging/Server/MsgStore/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where
{-# INLINE getCreateService #-}
setQueueService = withQS setQueueService
{-# INLINE setQueueService #-}
setQueueServices = withQS setQueueServices
{-# INLINE setQueueServices #-}
getQueueNtfServices = withQS (getQueueNtfServices @(JournalQueue s))
{-# INLINE getQueueNtfServices #-}
getServiceQueueCountHash = withQS (getServiceQueueCountHash @(JournalQueue s))
Expand Down
Loading
Loading