@@ -585,8 +585,6 @@ void TfBuilderInputUCX::DataHandlerThread(const unsigned pThreadIdx)
585585 continue ;
586586 }
587587
588- const auto lRmaGetStart = clock::now ();
589-
590588 StfMetaRdmaInfo lStfRdmaInfo = std::move (lStfMetaOpt.value ());
591589 UCXIovStfHeader &lStfMeta = lStfRdmaInfo.mStfMeta ;
592590 const std::uint64_t lTfId = lStfMeta.stf_hdr_meta ().stf_id ();
@@ -606,28 +604,33 @@ void TfBuilderInputUCX::DataHandlerThread(const unsigned pThreadIdx)
606604 continue ;
607605 }
608606
607+ clock::time_point lRmaGetStart = clock::now ();
609608 {
610- std::scoped_lock lStfSenderIoLock (lConn->mStfSenderIoLock );
611-
612609 if (!lStfMeta.stf_txg_iov ().empty ()) {
613610 ucx::io::dd_ucp_multi_req_v2 lRmaReqSem;
614- {
611+
612+ // Put the RDMA-GET operations in the exclusive section to prevent congestion in multi-worker configuration
613+ static std::mutex sRdmaSectionMutex ;
614+ { std::scoped_lock lRdmaSectionLock (sRdmaSectionMutex );
615+
615616 // It's safe to use shared key lock because preprocess thread created required keys for this stf
616- std::shared_lock lKeysLock (lConn->mRemoteKeysLock );
617+ { std::shared_lock lKeysLock (lConn->mRemoteKeysLock );
618+ lRmaGetStart = clock::now (); // update with exact time we started RDMA operations
617619
618- // RMA get all the txgs
619- for (const auto &lStfTxg : lStfMeta.stf_txg_iov ()) {
620- assert (lStfTxg.len () > 0 );
620+ // RMA get all the txgs
621+ for (const auto &lStfTxg : lStfMeta.stf_txg_iov ()) {
622+ assert (lStfTxg.len () > 0 );
621623
622- void *lTxgUcxPtr = mTimeFrameBuilder .mMemRes .mDataMemRes ->get_ucx_ptr (lTxgPtrs[lStfTxg.txg ()]);
623- const ucp_rkey_h lRemoteKey = lConn->mRemoteKeys [lStfMeta.data_regions (lStfTxg.region ()).region_rkey ()];
624- ucx::io::get (lConn->ucp_ep , lTxgUcxPtr, lStfTxg.len (), lStfTxg.start (), lRemoteKey, &lRmaReqSem);
624+ void *lTxgUcxPtr = mTimeFrameBuilder .mMemRes .mDataMemRes ->get_ucx_ptr (lTxgPtrs[lStfTxg.txg ()]);
625+ const ucp_rkey_h lRemoteKey = lConn->mRemoteKeys [lStfMeta.data_regions (lStfTxg.region ()).region_rkey ()];
626+ ucx::io::get (lConn->ucp_ep , lTxgUcxPtr, lStfTxg.len (), lStfTxg.start (), lRemoteKey, &lRmaReqSem);
627+ }
625628 }
626- }
627629
628- // wait for final completion
629- if (!lRmaReqSem.wait (lConn->mWorker , mRdmaPollingWait )) {
630+ // wait for the completion
631+ if (!lRmaReqSem.wait (lConn->mWorker , mRdmaPollingWait )) {
630632 break ;
633+ }
631634 }
632635 }
633636
0 commit comments