Skip to content

Commit a0205a7

Browse files
committed
tf merger: support out-of-order merging
1 parent a6e69c4 commit a0205a7

2 files changed

Lines changed: 64 additions & 47 deletions

File tree

src/TfBuilder/TfBuilderInput.cxx

Lines changed: 51 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@ bool TfBuilderInput::start(std::shared_ptr<ConsulTfBuilder> pConfig)
139139
// Start the merger
140140
{
141141
std::unique_lock<std::mutex> lQueueLock(mStfMergerQueueLock);
142-
mStfMergeQueue.clear();
142+
mStfMergeMap.clear();
143+
mStfCount = 0;
143144

144145
// start the merger thread
145146
mStfMergerThread = std::thread(&TfBuilderInput::StfMergerThread, this);
@@ -201,10 +202,10 @@ void TfBuilderInput::stop(std::shared_ptr<ConsulTfBuilder> pConfig)
201202

202203
// Make sure the merger stopped
203204
{
204-
DDLOG(fair::Severity::INFO) << "TfBuilderInput::stop: Stopping the Stf merger thread.";
205+
DDLOG(fair::Severity::INFO) << "TfBuilderInput::stop: Stopping the STF merger thread.";
205206
{
206207
std::unique_lock<std::mutex> lQueueLock(mStfMergerQueueLock);
207-
mStfMergeQueue.clear();
208+
mStfMergeMap.clear();
208209
DDLOG(fair::Severity::INFO) << "TfBuilderInput::stop: Merger queue emptied.";
209210
mStfMergerCondition.notify_all();
210211
}
@@ -228,7 +229,9 @@ void TfBuilderInput::stop(std::shared_ptr<ConsulTfBuilder> pConfig)
228229
/// Receiving thread
229230
void TfBuilderInput::DataHandlerThread(const std::uint32_t pFlpIndex)
230231
{
231-
DDLOG(fair::Severity::INFO) << "Starting input thread for StfSender[" << pFlpIndex << "]...";
232+
DataDistLogger::SetThreadName(fmt::format("Receiver[{}]", pFlpIndex));
233+
234+
DDLOGF(fair::Severity::TRACE, "Starting receiver thread for StfSender[{}]", pFlpIndex);
232235

233236
// Reference to the input channel
234237
auto& lInputChan = *mStfSenderChannels[pFlpIndex];
@@ -247,22 +250,21 @@ void TfBuilderInput::DataHandlerThread(const std::uint32_t pFlpIndex)
247250
const TimeFrameIdType lTfId = lStf->header().mId;
248251

249252
{
250-
static thread_local std::atomic_uint64_t sNumStfs = 0;
251-
if (++sNumStfs % 88 == 0)
252-
DDLOG(fair::Severity::DEBUG) << "Received Stf from flp " << pFlpIndex << " with id " << lTfId << ", total: " << sNumStfs;
253+
static thread_local std::uint64_t sNumStfs = 0;
254+
if (++sNumStfs % 100 == 0) {
255+
DDLOGF(fair::Severity::DEBUG, "Received STF. flp_id={} stf_id={} total={}",
256+
pFlpIndex, lTfId, sNumStfs);
257+
}
253258
}
254259

255260
{
256261
// Push the STF into the merger queue
257262
std::unique_lock<std::mutex> lQueueLock(mStfMergerQueueLock);
258-
mStfMergeQueue.emplace(std::make_pair(lTfId, std::move(lStf)));
259-
260-
// Notify the Merger if enough inputs are collected
261-
// NOW: Merge STFs if exactly |FLP| chunks have been received
262-
// or a next TF started arriving (STFs from previous delayed or not
263-
// available)
264-
// TODO: Find out exactly how many STFs is arriving.
265-
if (mStfMergeQueue.size() >= mNumStfSenders){
263+
264+
mStfMergeMap[lTfId].emplace_back(std::move(lStf));
265+
mStfCount++;
266+
267+
if (mStfMergeMap[lTfId].size() == mNumStfSenders) {
266268
lQueueLock.unlock();
267269
mStfMergerCondition.notify_one();
268270
}
@@ -281,60 +283,63 @@ void TfBuilderInput::StfMergerThread()
281283

282284
std::unique_lock<std::mutex> lQueueLock(mStfMergerQueueLock);
283285

284-
if (mStfMergeQueue.empty()) {
285-
// wait for the signal
286+
if (mStfCount < mNumStfSenders) {
286287
if (std::cv_status::timeout == mStfMergerCondition.wait_for(lQueueLock, 500ms)) {
287-
continue; // should exit?
288+
continue;
288289
}
289290
}
290291

291-
// check for spurious signaling
292-
if (mStfMergeQueue.empty()) {
292+
if (mStfCount < mNumStfSenders) {
293293
continue;
294294
}
295295

296-
// check the merge queue for partial TFs first
297-
const SubTimeFrameIdType lTfId = mStfMergeQueue.begin()->first;
296+
for (auto &lStfInfoIt : mStfMergeMap) {
297+
auto &lStfMetaVec = lStfInfoIt.second;
298+
const auto lStfId = lStfInfoIt.first;
298299

299-
// Case 1: a full TF can be merged
300-
if (mStfMergeQueue.count(lTfId) == mNumStfSenders) {
300+
if (lStfMetaVec.size() < mNumStfSenders) {
301+
continue;
302+
}
301303

302-
auto lStfRange = mStfMergeQueue.equal_range(lTfId);
303-
assert(std::distance(lStfRange.first, lStfRange.second) == mNumStfSenders);
304+
if (lStfMetaVec.size() > mNumStfSenders) {
305+
DDLOGF(fair::Severity::ERROR,
306+
"StfMerger: number of STFs is larger than expected. stf_id={:d} num_stfs={:d} num_stf_senders={:d}",
307+
lStfId, lStfMetaVec.size(), mNumStfSenders);
308+
}
304309

305-
auto lStfCount = 1UL; // start from the first element
306-
std::unique_ptr<SubTimeFrame> lTf = std::move(lStfRange.first->second);
310+
// merge the current TF!
311+
const auto lBuildDurationMs = std::chrono::duration_cast<std::chrono::milliseconds>(
312+
lStfMetaVec.rbegin()->mTimeReceived - lStfMetaVec.begin()->mTimeReceived);
307313

308-
for (auto lStfIter = std::next(lStfRange.first); lStfIter != lStfRange.second; ++lStfIter) {
309-
// Add them all up
310-
lTf->mergeStf(std::move(lStfIter->second));
311-
lStfCount++;
314+
// start from the first element (using it as the seed for the TF)
315+
std::unique_ptr<SubTimeFrame> lTf = std::move(lStfMetaVec.begin()->mStf);
316+
317+
for (auto lStfIter = std::next(lStfMetaVec.begin()); lStfIter != lStfMetaVec.end(); ++lStfIter) {
318+
// Add them all up
319+
lTf->mergeStf(std::move(lStfIter->mStf));
312320
}
313321

314-
if (lStfCount < mNumStfSenders) {
315-
DDLOG(fair::Severity::WARN) << "STF MergerThread: merging incomplete TF[" << lTf->header().mId << "]: contains "
316-
<< lStfCount << " instead of " << mNumStfSenders << " SubTimeFrames";
322+
{
323+
static std::uint64_t sNumBuiltTfs = 0;
324+
if (++sNumBuiltTfs % 10 == 0) {
325+
DDLOGF(fair::Severity::DEBUG, "Building of TF completed. tf_id={:d} duration_ms={} total_tf={:d}",
326+
lStfId, lBuildDurationMs.count(), sNumBuiltTfs);
327+
}
317328
}
318329

319330
// remove consumed STFs from the merge queue
320-
mStfMergeQueue.erase(lStfRange.first, lStfRange.second);
331+
mStfMergeMap.erase(lStfId);
332+
mStfCount -= mNumStfSenders;
321333

322334
// account the size of received TF
323335
mRpc->recordTfBuilt(*lTf);
324336

325337
// Queue out the TF for consumption
326338
mDevice.queue(mOutStage, std::move(lTf));
327-
}
328-
// else if (mStfMergeQueue.size() > (50 * mNumStfSenders)) {
329-
// // FIXME: for now, discard incomplete TFs
330-
// DDLOG(fair::Severity::WARN) << "Unbounded merge queue size: " << mStfMergeQueue.size();
331-
332-
// const auto lDroppedStfs = mStfMergeQueue.count(lTfId);
333339

334-
// mStfMergeQueue.erase(lTfId);
335-
336-
// DDLOG(fair::Severity::WARN) << "Dropping oldest incomplete TF... (" << lDroppedStfs << " STFs)";
337-
// }
340+
// break from the for loop and try again
341+
break;
342+
}
338343
}
339344

340345
DDLOG(fair::Severity::INFO) << "Exiting STF merger thread...";

src/TfBuilder/TfBuilderInput.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,19 @@ class TfBuilderInput
7575
std::thread mStfMergerThread;
7676
std::mutex mStfMergerQueueLock;
7777
std::condition_variable mStfMergerCondition;
78-
std::multimap<TimeFrameIdType, std::unique_ptr<SubTimeFrame>> mStfMergeQueue;
78+
79+
struct ReceivedStfMeta {
80+
std::chrono::time_point<std::chrono::system_clock> mTimeReceived;
81+
std::unique_ptr<SubTimeFrame> mStf;
82+
83+
ReceivedStfMeta(std::unique_ptr<SubTimeFrame>&& pStf)
84+
: mTimeReceived(std::chrono::system_clock::now()),
85+
mStf(std::move(pStf))
86+
{}
87+
};
88+
89+
std::map<TimeFrameIdType, std::vector<ReceivedStfMeta>> mStfMergeMap;
90+
std::uint64_t mStfCount = 0;
7991

8092
/// Output pipeline stage
8193
unsigned mOutStage;

0 commit comments

Comments
 (0)