Skip to content

Commit 2dc01a2

Browse files
committed
tfbuilder: add blocking mode for rdma operations (optional, saves 0.5 CPU core)
1 parent ce6a4f3 commit 2dc01a2

File tree

4 files changed

+63
-9
lines changed

4 files changed

+63
-9
lines changed

src/TfBuilder/TfBuilderInputUCX.cxx

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,8 @@ bool TfBuilderInputUCX::start()
191191
{
192192
// setting configuration options
193193
mThreadPoolSize = std::clamp(mConfig->getUInt64Param(UcxTfBuilderThreadPoolSizeKey, UcxTfBuilderThreadPoolSizeDefault), std::size_t(1), std::size_t(256));
194-
IDDLOG("TfBuilderInputUCX: Configuration loaded. thread_pool={}", mThreadPoolSize);
194+
mRdmaPollingWait =mConfig->getBoolParam(UcxPollForRDMACompletionKey, UcxPollForRDMACompletionDefault);
195+
IDDLOG("TfBuilderInputUCX: Configuration loaded. thread_pool={} polling={}", mThreadPoolSize, mRdmaPollingWait);
195196

196197
auto &lConfStatus = mConfig->status();
197198

@@ -628,9 +629,18 @@ void TfBuilderInputUCX::DataHandlerThread(const unsigned pThreadIdx)
628629
ucx::io::get(lConn->ucp_ep, lTxgUcxPtr, lStfTxg.len(), lStfTxg.start(), lConn->mRemoteKeys[lStfMeta.data_regions(lStfTxg.region()).region_rkey()], &lRmaReqSem);
629630
}
630631
}
632+
631633
// wait for final completion
632-
if (!ucx::io::ucp_wait_poll(lConn->mWorker, lRmaReqSem)) {
633-
break;
634+
if (mRdmaPollingWait) {
635+
// polling
636+
if (!lRmaReqSem.wait_poll(lConn->mWorker)) {
637+
break;
638+
}
639+
} else {
640+
// blocking
641+
if (!lRmaReqSem.wait(lConn->mWorker)) {
642+
break;
643+
}
634644
}
635645

636646
// notify StfSender we completed (use mapped scratch memory)

src/TfBuilder/TfBuilderInputUCX.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include <SubTimeFrameDataModel.h>
2323
#include <ConcurrentQueue.h>
24+
#include <DataDistributionOptions.h>
2425

2526
#include <UCXUtilities.h>
2627
#include <ucp/api/ucp.h>
@@ -171,6 +172,7 @@ class TfBuilderInputUCX
171172
std::unordered_map<std::string, std::size_t> mStfSenderToWorkerMap;
172173
std::unordered_map<std::string, std::shared_ptr<ConcurrentQueue<StfMetaRdmaInfo> > > mStfMetaWorkerQueues;
173174
std::vector<std::thread> mThreadPool;
175+
bool mRdmaPollingWait = UcxPollForRDMACompletionDefault;
174176

175177
// STF postprocess threads
176178
ConcurrentQueue<StfMetaRdmaInfo> mStfPostprocessQueue;

src/common/discovery/DataDistributionOptions.h

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ static constexpr std::uint64_t UcxStfSenderThreadPoolSizeDefault = 8;
8181
////////////////////////////////////////////////////////////////////////////////
8282

8383
// Define maximum number of concurrent STF transfers
84-
// The value should not be much grater than UcxTfBuilderThreadPoolSize as the requests will will not be processed immediately.
84+
// The value should be greater than max number of FLPs.
8585
static constexpr std::string_view MaxNumStfTransfersKey = "MaxNumStfTransfers";
8686
static constexpr std::uint64_t MaxNumStfTransferDefault = 300;
8787

@@ -95,14 +95,13 @@ static constexpr std::uint64_t StfSenderGrpcThreadPoolSizeDefault = 8;
9595

9696

9797
/// UCX transport
98-
// Size of receiver treadpool. Default 0 (number of cpu cores)
98+
// Size of receiver treadpool. Default 1, works best. Should not be set over 2, to avoid congestion on the receiver.
9999
static constexpr std::string_view UcxTfBuilderThreadPoolSizeKey = "UcxTfBuilderThreadPoolSize";
100100
static constexpr std::uint64_t UcxTfBuilderThreadPoolSizeDefault = 1;
101101

102-
// Number of rma_get operation in flight, per ucx thread
103-
// NOTE: deprecated in DD 1.4
104-
// static constexpr std::string_view UcxNumConcurrentRmaGetOpsKey = "UcxNumConcurrentRmaGetOps";
105-
// static constexpr std::uint64_t UcxNumConcurrentRmaGetOpsDefault = 32;
102+
// Use polling or blocking waiting method for RDMA completion.
103+
static constexpr std::string_view UcxPollForRDMACompletionKey = "UcxPollForRDMACompletion";
104+
static constexpr bool UcxPollForRDMACompletionDefault = false;
106105

107106

108107
////////////////////////////////////////////////////////////////////////////////

src/common/ucxtools/UCXSendRecv.h

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,49 @@ struct dd_ucp_multi_req_v2 {
127127
}
128128
return true;
129129
}
130+
131+
inline
132+
bool wait(dd_ucp_worker &pDDCtx) {
133+
for (;;) {
134+
// check if request is done
135+
if (done()) {
136+
return true;
137+
} else if (ucp_worker_progress(pDDCtx.ucp_worker)) {
138+
continue;
139+
}
140+
141+
// block on the worker
142+
auto status = ucp_worker_arm(pDDCtx.ucp_worker);
143+
144+
if (UCS_OK == status) {
145+
int epoll_ret;
146+
do {
147+
epoll_ret = epoll_wait(pDDCtx.epoll_fd, &pDDCtx.ev, 1, 100);
148+
} while ((epoll_ret == -1) && (errno == EINTR || errno == EAGAIN));
149+
150+
if (epoll_ret == -1) {
151+
EDDLOG("Failed ucp_advance epoll. errno={}", errno);
152+
return done();
153+
}
154+
} else if (UCS_ERR_BUSY == status) {
155+
continue; // could not arm, recheck the request
156+
}
157+
// epoll returned or timeout, recheck the request
158+
}
159+
return done();
160+
}
161+
162+
inline
163+
bool wait_poll(dd_ucp_worker &pDDCtx) const {
164+
for (;;) {
165+
// check if request is done
166+
if (done()) {
167+
return true;
168+
}
169+
while (ucp_worker_progress(pDDCtx.ucp_worker) > 0) { }
170+
}
171+
return true;
172+
}
130173
};
131174

132175

0 commit comments

Comments
 (0)