2020#include < tuple>
2121#include < algorithm>
2222#include < future>
23+ #include < shared_mutex>
2324
2425namespace o2 ::DataDistribution
2526{
@@ -34,6 +35,21 @@ bool TfSchedulerConnManager::start()
3435 return false ; // we'll be called back
3536 }
3637
38+ // start all Connection threads
39+ {
40+ std::unique_lock lRpcLock (mStfSenderClientsLock );
41+ std::unique_lock lConnInfoLock (mConnectInfoLock );
42+
43+ for (auto &[lStfSenderId, lRpcClient] : mStfSenderRpcClients ) {
44+
45+ mConnectThreadInfos .emplace (std::make_pair (lStfSenderId, StfSenderUCXThreadInfo ()));
46+
47+ std::string lThreadName = " ucx_conn_" + lStfSenderId;
48+ mConnectionThreads .push_back (create_thread_member (lThreadName.c_str (),
49+ &TfSchedulerConnManager::ConnectTfBuilderUCXThread, this , lStfSenderId));
50+ }
51+ }
52+
3753 mRunning = true ;
3854
3955 // start gRPC client monitoring thread
@@ -62,6 +78,20 @@ void TfSchedulerConnManager::stop()
6278 mDropFutureWaitThread .join ();
6379 }
6480
81+ // start all Connection threads
82+ {
83+ std::unique_lock lConnInfoLock (mConnectInfoLock );
84+ for (auto &[lStfSenderId, lThreadInfo] : mConnectThreadInfos ) {
85+ lThreadInfo.mConnReqQueue ->stop ();
86+ }
87+
88+ for (auto &lConnThread : mConnectionThreads ) {
89+ if (lConnThread.joinable ()) {
90+ lConnThread.join ();
91+ }
92+ }
93+ }
94+
6595 // delete all rpc clients
6696 mStfSenderRpcClients .stop ();
6797 }
@@ -215,45 +245,64 @@ void TfSchedulerConnManager::connectTfBuilderUCX(const TfBuilderConfigStatus &pT
215245 }
216246
217247 const std::string &lTfBuilderId = pTfBuilderStatus.info ().process_id ();
218- // const auto &lListenAddr = pTfBuilderStatus.ucx_info().listen_ip();
219- // const auto &lListenPort = pTfBuilderStatus.ucx_info().listen_port();
220248
221- std::scoped_lock lLock (mStfSenderClientsLock );
249+ std::shared_lock lLock (mStfSenderClientsLock );
222250
223251 if (!stfSendersReady ()) {
224252 IDDLOG (" TfBuilder UCX Connection: StfSenders gRPC connection not ready." );
225253 pResponse.set_status (ERROR_STF_SENDERS_NOT_READY);
226254 return ;
227255 }
228256
257+ // send message to all StfSenders to connect
258+ bool lConnectionsOk = true ;
259+ pResponse.set_status (OK);
260+
261+ TfBuilderUCXEndpoint lParam;
262+ lParam.set_tf_builder_id (lTfBuilderId);
263+ lParam.mutable_endpoint ()->CopyFrom (pTfBuilderStatus.ucx_info ());
264+
265+ DDDLOG (" connectTfBuilderUCX: starting connections for tf_builder_id={}" , lTfBuilderId);
266+
267+ ConcurrentQueue<std::tuple<bool , std::string, ConnectTfBuilderUCXResponse>> lConnRepQueue;
268+ std::size_t lStfSenderCnt = 0 ;
269+ // queue all connection requests
270+ {
271+ std::shared_lock lStfSenderInfoLock (mConnectInfoLock );
272+ lStfSenderCnt = mConnectThreadInfos .size ();
273+ for (auto &[lStfSenderId, lThreadInfo] : mConnectThreadInfos ) {
274+ (void )lStfSenderId;
275+ auto &lReqQueue = lThreadInfo.mConnReqQueue ;
276+ lReqQueue->push (std::make_unique<StfSenderUCXConnectReq>(lParam, &lConnRepQueue));
277+ }
278+ }
279+
280+ DDDLOG (" connectTfBuilderUCX: starting gRPC client for tf_builder_id={}" , lTfBuilderId);
281+
229282 // Open the gRPC connection to the new TfBuilder (will only add if already does not exist)
230283 if (!newTfBuilderRpcClient (lTfBuilderId)) {
231284 WDDLOG (" TfBuilder gRPC connection error: Cannot open the gRPC connection. tfb_id={}" , lTfBuilderId);
232285 pResponse.set_status (ERROR_GRPC_TF_BUILDER);
233286 return ;
234287 }
235288
236- // send message to all StfSenders to connect
237- bool lConnectionsOk = true ;
238- pResponse.set_status (OK);
289+ // wait for all connection replies
290+ for (std::size_t i = 0 ; i < lStfSenderCnt; i++) {
239291
240- TfBuilderUCXEndpoint lParam;
241- lParam.set_tf_builder_id (lTfBuilderId);
242- lParam.mutable_endpoint ()->CopyFrom (pTfBuilderStatus.ucx_info ());
292+ std::tuple<bool , std::string, ConnectTfBuilderUCXResponse> lRep;
293+ bool lRepOk = lConnRepQueue.pop (lRep);
243294
244- for (auto &[lStfSenderId, lRpcClient] : mStfSenderRpcClients ) {
245- ConnectTfBuilderUCXResponse lResponse;
246- if (!lRpcClient->ConnectTfBuilderUCXRequest (lParam, lResponse).ok ()) {
247- EDDLOG_RL (1000 , " TfBuilder UCX Connection error: gRPC error when connecting StfSender. stfs_id={} tfb_id={}" ,
248- lStfSenderId, lTfBuilderId);
295+ if (!lRepOk && !std::get<0 >(lRep)) {
249296 pResponse.set_status (ERROR_GRPC_STF_SENDER);
250297 lConnectionsOk = false ;
251298 break ;
252299 }
253300
301+ const auto &lStfSenderId = std::get<1 >(lRep);
302+ auto &lResponse = std::get<2 >(lRep);
303+
254304 // check StfSender status
255305 if (lResponse.status () != OK) {
256- EDDLOG_RL (1000 , " TfBuilder UCX Connection error: cannot connect. stfs_id={} tfb_id={}" , lStfSenderId, lTfBuilderId);
257306 pResponse.set_status (lResponse.status ());
258307 lConnectionsOk = false ;
259308 break ;
@@ -270,6 +319,55 @@ void TfSchedulerConnManager::connectTfBuilderUCX(const TfBuilderConfigStatus &pT
270319 disconnectTfBuilderUCX (pTfBuilderStatus, lResponse);
271320 assert (pResponse.status () != OK);
272321 }
322+
323+ DDDLOG (" connectTfBuilderUCX: finished for tf_builder_id={} success={}" , lTfBuilderId, lConnectionsOk);
324+ }
325+
326+ // per FLP thread to connect TfBuilders
327+ void TfSchedulerConnManager::ConnectTfBuilderUCXThread (const std::string lStfSenderId)
328+ {
329+ // Initialize the thread queue
330+ std::unique_lock lThreadLock (mConnectInfoLock );
331+ auto &lConnReqQueue = mConnectThreadInfos [lStfSenderId].mConnReqQueue ;
332+
333+ lThreadLock.unlock ();
334+
335+ std::optional<std::unique_ptr<StfSenderUCXConnectReq>> lStfSenderIdOpt;
336+
337+ DDDLOG (" ConnectTfBuilderUCXThread started for stf_sender_id={}" , lStfSenderId);
338+
339+ while ((lStfSenderIdOpt = lConnReqQueue->pop ()) != std::nullopt ) {
340+
341+ bool lConnectionsOk = true ;
342+
343+ auto &lRpcClient = mStfSenderRpcClients [lStfSenderId];
344+ const auto &lParam = lStfSenderIdOpt.value ()->mRpcReq ;
345+ const auto &lTfBuilderId = lParam.tf_builder_id ();
346+
347+ ConnectTfBuilderUCXResponse lResponse;
348+ if (!lRpcClient->ConnectTfBuilderUCXRequest (lParam, lResponse).ok ()) {
349+ EDDLOG_RL (1000 , " TfBuilder UCX Connection error: gRPC error when connecting StfSender. stfs_id={} tfb_id={}" ,
350+ lStfSenderId, lTfBuilderId);
351+ // pResponse.set_status(ERROR_GRPC_STF_SENDER);
352+ lConnectionsOk = false ;
353+ break ;
354+ }
355+
356+ // check StfSender status
357+ if (lResponse.status () != OK) {
358+ EDDLOG_RL (1000 , " TfBuilder UCX Connection error: cannot connect. stfs_id={} tfb_id={}" , lStfSenderId, lTfBuilderId);
359+ // pResponse.set_status(lResponse.status());
360+ lConnectionsOk = false ;
361+ break ;
362+ }
363+
364+ // send reply
365+ auto mConnRepQueue = lStfSenderIdOpt.value ()->mConnRepQueue ;
366+
367+ if (mConnRepQueue ) {
368+ mConnRepQueue ->push (std::make_tuple (lConnectionsOk, lStfSenderId, std::move (lResponse)));
369+ }
370+ }
273371}
274372
275373void TfSchedulerConnManager::disconnectTfBuilderUCX (const TfBuilderConfigStatus &pTfBuilderStatus, StatusResponse &pResponse /* out*/ )
0 commit comments