Skip to content

Commit bd883ca

Browse files
committed
datadist: monitoring improvements; more accurate rates
+ fix for env id
1 parent 5be8dfe commit bd883ca

File tree

14 files changed

+312
-207
lines changed

14 files changed

+312
-207
lines changed

src/StfBuilder/StfBuilderDevice.cxx

Lines changed: 24 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,11 @@ void StfBuilderDevice::InitTask()
9898
I().mMaxBuiltStfs = GetConfig()->GetValue<std::uint64_t>(OptionKeyMaxBuiltStfs);
9999

100100
// partition id
101-
I().mPartitionId = Config::getPartitionOption(*GetConfig()).value_or(DataDistLogger::sPartitionIdStr);
102-
if (DataDistLogger::sPartitionIdStr.empty() && !I().mPartitionId.empty()) {
103-
DataDistLogger::sPartitionIdStr = I().mPartitionId;
104-
impl::DataDistLoggerCtx::InitInfoLogger();
101+
I().mPartitionId = Config::getPartitionOption(*GetConfig()).value_or("");
102+
if (I().mPartitionId.empty()) {
103+
EDDLOG("Partition id is not provided during InitTask(). Check command line or ECS parameters. Exiting.");
104+
ChangeState(fair::mq::Transition::ErrorFound);
105+
return;
105106
}
106107

107108
// start monitoring
@@ -130,17 +131,18 @@ void StfBuilderDevice::InitTask()
130131
// check run type
131132
if (ReadoutDataUtils::sRunType == ReadoutDataUtils::RunType::eInvalid) {
132133
EDDLOG("Run type paramter must be correctly set.");
133-
throw std::logic_error("Run type paramter must be correctly set.");
134+
ChangeState(fair::mq::Transition::ErrorFound);
135+
return;
134136
}
135137

136138
// check run type
137139
if (ReadoutDataUtils::sRunType == ReadoutDataUtils::RunType::eTopology) {
138140
if (! ((ReadoutDataUtils::sSpecifiedDataOrigin == o2::header::gDataOriginITS) ||
139141
(ReadoutDataUtils::sSpecifiedDataOrigin == o2::header::gDataOriginMFT))) {
140-
141142
EDDLOG("Run type paramter 'topology' is supported only for ITS and MFT. Please specify the detector option. detector={}",
142143
ReadoutDataUtils::sSpecifiedDataOrigin.as<std::string>());
143-
throw std::logic_error("Run type paramter 'topology' is supported only for ITS and MFT. Please specify the detector option.");
144+
ChangeState(fair::mq::Transition::ErrorFound);
145+
return;
144146
}
145147
}
146148

@@ -166,12 +168,14 @@ void StfBuilderDevice::InitTask()
166168

167169
// File sink
168170
if (!I().mFileSink->loadVerifyConfig(*(this->GetConfig()))) {
169-
std::this_thread::sleep_for(1s); exit(-1);
171+
ChangeState(fair::mq::Transition::ErrorFound);
172+
return;
170173
}
171174

172175
// File source
173176
if (!I().mFileSource->loadVerifyConfig(*(this->GetConfig()))) {
174-
std::this_thread::sleep_for(1s); exit(-1);
177+
ChangeState(fair::mq::Transition::ErrorFound);
178+
return;
175179
}
176180

177181
// Discovery. Verify other parameters when running online and !standalone
@@ -185,17 +189,6 @@ void StfBuilderDevice::InitTask()
185189
lStatus.mutable_info()->set_type(StfBuilder);
186190
lStatus.mutable_info()->set_process_state(BasicInfo::NOT_RUNNING);
187191
lStatus.mutable_info()->set_process_id(Config::getIdOption(StfBuilder, *GetConfig(), lConsulRequired));
188-
189-
// wait for "partition-id"
190-
while (!Config::getPartitionOption(*GetConfig())) {
191-
WDDLOG_RL(1000, "StfBuilder waiting on 'discovery-partition' config parameter.");
192-
std::this_thread::sleep_for(250ms);
193-
}
194-
if (I().mPartitionId.empty()) {
195-
EDDLOG("StfBuilder 'discovery-partition' parameter not set.");
196-
std::this_thread::sleep_for(1s); exit(-1);
197-
}
198-
I().mPartitionId = Config::getPartitionOption(*GetConfig()).value();
199192
lStatus.mutable_partition()->set_partition_id(I().mPartitionId);
200193
I().mDiscoveryConfig->write();
201194
}
@@ -209,14 +202,16 @@ void StfBuilderDevice::InitTask()
209202
(ReadoutDataUtils::sSpecifiedDataOrigin == o2::header::gDataOriginAny)) {
210203
EDDLOG("Detector string parameter must be specified when receiving the data from the "
211204
"readout and not using RDHv6 or greater.");
212-
std::this_thread::sleep_for(1s); exit(-1);
205+
ChangeState(fair::mq::Transition::ErrorFound);
206+
return;
213207
} else {
214208
IDDLOG("READOUT INTERFACE: Configured detector: {}", ReadoutDataUtils::sSpecifiedDataOrigin.as<std::string>());
215209
}
216210

217211
if (ReadoutDataUtils::sRdhVersion == ReadoutDataUtils::RdhVersion::eRdhInvalid) {
218212
EDDLOG("The RDH version must be specified when receiving data from readout.");
219-
std::this_thread::sleep_for(1s); exit(-1);
213+
ChangeState(fair::mq::Transition::ErrorFound);
214+
return;
220215
} else {
221216
IDDLOG("READOUT INTERFACE: Configured RDHv{}", ReadoutDataUtils::sRdhVersion);
222217
RDHReader::Initialize(unsigned(ReadoutDataUtils::sRdhVersion));
@@ -254,7 +249,8 @@ void StfBuilderDevice::InitTask()
254249
GetChannel(I().mInputChannelName);
255250
} catch(std::exception &) {
256251
EDDLOG("Input channel not configured (from o2-readout-exe) and not running with file source enabled.");
257-
std::this_thread::sleep_for(1s); exit(-1);
252+
ChangeState(fair::mq::Transition::ErrorFound);
253+
return;
258254
}
259255
}
260256

@@ -264,7 +260,8 @@ void StfBuilderDevice::InitTask()
264260
}
265261
} catch(std::exception &e) {
266262
EDDLOG("Output channel (to DPL or StfSender) must be configured if not running in stand-alone mode.");
267-
std::this_thread::sleep_for(1s); exit(-1);
263+
ChangeState(fair::mq::Transition::ErrorFound);
264+
return;
268265
}
269266
}
270267

@@ -337,6 +334,8 @@ void StfBuilderDevice::StfOutputThread()
337334

338335
decltype(hres_clock::now()) lStfStartTime = hres_clock::now();
339336

337+
DDMON_RATE("stfbuilder", "stf_output", 0.0);
338+
340339
while (I().mState.mRunning) {
341340
// Get a STF ready for sending, or nullopt
342341
auto lStfOpt = I().dequeue_for(eStfSendIn, 100ms);
@@ -358,13 +357,12 @@ void StfBuilderDevice::StfOutputThread()
358357

359358
lShouldSendEos = true;
360359
if (lStfOpt == std::nullopt) {
361-
DDMON("stfbuilder", "stf_output.rate", 0);
362-
DDMON("stfbuilder", "stf_output.size", 0);
363360
DDMON("stfbuilder", "data_output.rate", 0);
364361
continue;
365362
}
366363

367364
auto &lStf = lStfOpt.value();
365+
DDMON_RATE("stfbuilder", "stf_output", lStf->getDataSize());
368366

369367
// decrement the stf counter
370368
I().mCounters.mNumStfs--;
@@ -379,8 +377,6 @@ void StfBuilderDevice::StfOutputThread()
379377
lStfStartTime = lNow;
380378

381379
const auto lRate = 1.0 / std::max(lStfDur.count(), 0.00001);
382-
DDMON("stfbuilder", "stf_output.rate", lRate);
383-
DDMON("stfbuilder", "stf_output.size", lStf->getDataSize());
384380
DDMON("stfbuilder", "stf_output.id", lStf->id());
385381
DDMON("stfbuilder", "data_output.rate", (lRate * lStf->getDataSize()));
386382
}

src/StfBuilder/StfBuilderInput.cxx

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -231,10 +231,7 @@ void StfInputInterface::StfBuilderThread()
231231
};
232232

233233
const auto cStfDataWaitFor = 2s;
234-
235-
using hres_clock = std::chrono::high_resolution_clock;
236-
237-
std::chrono::time_point<hres_clock, std::chrono::duration<double>> lStartSec = hres_clock::now();
234+
DDMON_RATE("stfbuilder", "stf_input", 0.0);
238235

239236
while (mRunning) {
240237

@@ -251,13 +248,9 @@ void StfInputInterface::StfBuilderThread()
251248
}
252249

253250
(*lStf)->setOrigin(SubTimeFrame::Header::Origin::eReadout);
251+
252+
DDMON_RATE("stfbuilder", "stf_input", (*lStf)->getDataSize());
254253
mSeqStfQueue.push(std::move(*lStf));
255-
{
256-
auto lNow = hres_clock::now();
257-
std::chrono::duration<double> lTimeDiff = lNow - lStartSec;
258-
lStartSec = lNow;
259-
DDMON("stfbuilder", "stf_input.rate", (1.0 / lTimeDiff.count()));
260-
}
261254
}
262255
};
263256

@@ -407,7 +400,6 @@ void StfInputInterface::StfBuilderThread()
407400
void StfInputInterface::TopologicalStfBuilderThread()
408401
{
409402
using namespace std::chrono_literals;
410-
using hres_clock = std::chrono::high_resolution_clock;
411403

412404
bool lStarted = false;
413405
std::vector<FairMQMessagePtr> lReadoutMsgs(1U << 20);
@@ -443,43 +435,46 @@ void StfInputInterface::TopologicalStfBuilderThread()
443435
std::uint64_t(50000)
444436
);
445437

446-
IDDLOG("Aggregating {} pages for each link in topological STF building.", lNumPagesInThresholdScanStf);
438+
// get a flag for cutting topo TFs on a new orbit
439+
const bool lCutOnNewOrbit = mDiscoveryConfig->getBoolParam(StartTopologicalStfOnNewOrbitfKey, StartTopologicalStfOnNewOrbitDefault);
440+
441+
IDDLOG("Aggregating pages for each link in topological STF building. num_pages={} only_new_orbit={}",
442+
lNumPagesInThresholdScanStf, lCutOnNewOrbit);
447443

448444
// insert and mask the feeid. Return the Stf if aggregation is reached
449-
auto lInsertTopoWithFeeIdMasking = [&lStfBuilder, lFeeIdMask, &lNumPagesInThresholdScanStf] (
445+
// Note call again if lCutOnNewOrbit is not 0
446+
auto lInsertTopoWithFeeIdMasking = [&lStfBuilder, lFeeIdMask, lNumPagesInThresholdScanStf, lCutOnNewOrbit] (
450447
const header::DataOrigin &pDataOrigin,
451448
const header::DataHeader::SubSpecificationType &pSubSpec,
452449
const ReadoutSubTimeframeHeader &pRdoHeader,
453-
const FairMQParts::iterator pStartHbf,
454-
const std::size_t pInsertCnt) {
450+
FairMQParts::iterator &pStartHbf /* in/out */,
451+
std::size_t &pInsertCnt /* in/out */) -> std::optional<std::unique_ptr<SubTimeFrame> > {
455452

456453
// mask the subspecification if the fee mode is used
457454
auto lMaskedSubspec = pSubSpec;
458455
if (ReadoutDataUtils::SubSpecMode::eFeeId == ReadoutDataUtils::sRawDataSubspectype) {
459456
lMaskedSubspec &= lFeeIdMask;
460457
}
461458

462-
return lStfBuilder.addTopoStfData(pDataOrigin, lMaskedSubspec, pRdoHeader, pStartHbf, pInsertCnt, lNumPagesInThresholdScanStf);
459+
return lStfBuilder.addTopoStfData(pDataOrigin, lMaskedSubspec, pRdoHeader, pStartHbf, pInsertCnt,
460+
lNumPagesInThresholdScanStf, lCutOnNewOrbit);
463461
};
464462

465463
const auto cStfDataWaitFor = 500ms;
466-
std::chrono::time_point<hres_clock, std::chrono::duration<double>> lStartSec = hres_clock::now();
464+
DDMON_RATE("stfbuilder", "stf_input", 0.0);
467465

468466
auto queueStf = [&] (std::unique_ptr<SubTimeFrame> pStf) {
467+
// static auto lStartSec = clock::now();
468+
469469
static std::uint64_t sStfId = 0;
470470
if (pStf) {
471471
// make sure we queue STFs in ascending order
472472
sStfId += 1;
473473
pStf->updateId(sStfId);
474474
pStf->setOrigin(SubTimeFrame::Header::Origin::eReadoutTopology);
475475

476+
DDMON_RATE("stfbuilder", "stf_input", pStf->getDataSize());
476477
mDevice.I().queue(eStfBuilderOut, std::move(pStf));
477-
{
478-
auto lNow = hres_clock::now();
479-
std::chrono::duration<double> lTimeDiff = lNow - lStartSec;
480-
lStartSec = lNow;
481-
DDMON("stfbuilder", "stf_input.rate", (1.0 / lTimeDiff.count()));
482-
}
483478
}
484479
};
485480

@@ -497,7 +492,6 @@ void StfInputInterface::TopologicalStfBuilderThread()
497492
};
498493

499494
while (mRunning) {
500-
501495
// Equipment ID for the HBFrames (from the header)
502496
lReadoutMsgs.clear();
503497

@@ -558,9 +552,13 @@ void StfInputInterface::TopologicalStfBuilderThread()
558552
std::size_t lInsertCnt = lReadoutMsgs.size() - 1;
559553

560554
// insert all HBFs
561-
auto lStfOpt = lInsertTopoWithFeeIdMasking(lDataOrigin, lSubSpecification, lReadoutHdr, lStartHbf, lInsertCnt);
562-
if (lStfOpt) {
563-
queueStf(std::move(*lStfOpt));
555+
while (lInsertCnt > 0) {
556+
auto lStfOpt = lInsertTopoWithFeeIdMasking(lDataOrigin, lSubSpecification, lReadoutHdr, lStartHbf, lInsertCnt);
557+
if (lStfOpt) {
558+
queueStf(std::move(*lStfOpt));
559+
} else {
560+
assume(lInsertCnt == 0);
561+
}
564562
}
565563
}
566564

0 commit comments

Comments
 (0)