Skip to content

Commit ef9b1f5

Browse files
committed
stfbuilder: add monitoring
1 parent c13335a commit ef9b1f5

12 files changed

Lines changed: 61 additions & 83 deletions

script/start_Discovery-3FLP-3EPN.sh.in

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ STF_BUILDER+=" $CHECKRDH"
124124
STF_BUILDER+=" --io-threads $IO_THREADS"
125125
STF_BUILDER+=" --detector $DETECTOR"
126126
STF_BUILDER+=" --detector-rdh 4"
127+
STF_BUILDER+=" --monitoring-rate=1.0"
128+
STF_BUILDER+=" --monitoring-log"
127129

128130
if [[ ! -z $STF_BUILDER_DPL_CHAN ]]; then
129131
STF_BUILDER+=" --dpl-channel-name=$STF_BUILDER_DPL_CHAN"

src/StfBuilder/CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ set(EXE_STFB_SOURCES
88
)
99

1010
add_library(StfBuilder_lib OBJECT ${EXE_STFB_SOURCES})
11-
target_link_libraries(StfBuilder_lib base fmqtools common)
11+
target_link_libraries(StfBuilder_lib base fmqtools common monitoring)
1212

1313
add_executable(StfBuilder)
1414

@@ -21,7 +21,7 @@ endif()
2121
target_link_libraries(StfBuilder
2222
PRIVATE
2323
StfBuilder_lib
24-
base fmqtools common
24+
base fmqtools common monitoring
2525
)
2626

2727
install(TARGETS StfBuilder RUNTIME DESTINATION bin)

src/StfBuilder/StfBuilderDevice.cxx

Lines changed: 24 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ void StfBuilderDevice::InitTask()
9696
I().mMaxStfsInPipeline = GetConfig()->GetValue<std::int64_t>(OptionKeyMaxBufferedStfs);
9797
I().mMaxBuiltStfs = GetConfig()->GetValue<std::uint64_t>(OptionKeyMaxBuiltStfs);
9898

99+
// start monitoring
100+
DataDistMonitor::start_datadist(o2::monitoring::tags::Value::StfBuilder, GetConfig()->GetProperty<std::string>("monitoring-backend", ""));
101+
DataDistMonitor::set_rate(GetConfig()->GetValue<float>("monitoring-rate"));
102+
DataDistMonitor::set_log(GetConfig()->GetValue<bool>("monitoring-log"));
103+
99104
// input data handling
100105
ReadoutDataUtils::sSpecifiedDataOrigin = getDataOriginFromOption(
101106
GetConfig()->GetValue<std::string>(OptionKeyStfDetector));
@@ -222,9 +227,6 @@ void StfBuilderDevice::InitTask()
222227
I().mReadoutInterface->start();
223228
}
224229

225-
// info thread
226-
I().mInfoThread = create_thread_member("stfb_info", &StfBuilderDevice::InfoThread, this);
227-
228230
IDDLOG("InitTask() done... ");
229231
}
230232

@@ -257,11 +259,6 @@ void StfBuilderDevice::ResetTask()
257259
I().mOutputThread.join();
258260
}
259261

260-
// wait for the info thread
261-
if (I().mInfoThread.joinable()) {
262-
I().mInfoThread.join();
263-
}
264-
265262
// stop the memory resources very last
266263
MemI().stop();
267264

@@ -270,6 +267,8 @@ void StfBuilderDevice::ResetTask()
270267

271268
void StfBuilderDevice::StfOutputThread()
272269
{
270+
using hres_clock = std::chrono::high_resolution_clock;
271+
273272
std::unique_ptr<InterleavedHdrDataSerializer> lStfSerializer;
274273
std::unique_ptr<StfToDplAdapter> lStfDplAdapter;
275274

@@ -285,27 +284,35 @@ void StfBuilderDevice::StfOutputThread()
285284
}
286285
}
287286

288-
while (I().mState.mRunning) {
289-
using hres_clock = std::chrono::high_resolution_clock;
287+
decltype(hres_clock::now()) lStfStartTime = hres_clock::now();
290288

289+
while (I().mState.mRunning) {
291290
// Get a STF ready for sending
292291
std::unique_ptr<SubTimeFrame> lStf = I().dequeue(eStfSendIn);
293292
if (!lStf) {
293+
DDMON("stfbuilder", "stf_input.rate", 0);
294+
DDMON("stfbuilder", "stf_input.size", 0);
294295
break;
295296
}
296297

297298
// decrement the stf counter
298299
I().mCounters.mNumStfs--;
299300

300-
DDDLOG_RL(2000, "Sending an STF out. stf_id={} stf_size={} unique_equipments={}",
301-
lStf->header().mId, lStf->getDataSize(), lStf->getEquipmentIdentifiers().size());
301+
DDDLOG_RL(5000, "Sending an STF out. stf_id={} stf_size={} unique_equipments={}",
302+
lStf->id(), lStf->getDataSize(), lStf->getEquipmentIdentifiers().size());
302303

303-
// get data size sample
304-
I().mStfSizeMean += (lStf->getDataSize()/64 - I().mStfSizeMean/64);
304+
{
305+
// Output STF frequency
306+
const auto lNow = hres_clock::now();
307+
const auto lStfDur = std::chrono::duration<double>(lNow - lStfStartTime);
308+
lStfStartTime = lNow;
305309

306-
if (!isStandalone()) {
307-
const auto lSendStartTime = hres_clock::now();
310+
DDMON("stfbuilder", "stf_output.rate", (1.0 / lStfDur.count()));
311+
DDMON("stfbuilder", "stf_output.size", lStf->getDataSize());
312+
DDMON("stfbuilder", "stf_output.id", lStf->id());
313+
}
308314

315+
if (!isStandalone()) {
309316
try {
310317

311318
if (!dplEnabled()) {
@@ -335,11 +342,10 @@ void StfBuilderDevice::StfOutputThread()
335342

336343
I().mSentOutStfs++;
337344
I().mSentOutStfsTotal++;
345+
DDMON("stfbuilder", "stf_output.total", I().mSentOutStfsTotal);
338346

339347
const auto lNow = hres_clock::now();
340-
const double lTimeMs = std::max(1e-6, std::chrono::duration<double, std::milli>(lNow - lSendStartTime).count());
341348
I().mSentOutRate = double(I().mSentOutStfs) / std::chrono::duration<double>(lNow - sStartOfStfSending).count();
342-
I().mStfDataTimeSamples += (lTimeMs / 100.0 - I().mStfDataTimeSamples / 100.0);
343349
}
344350

345351
// check if we should exit:
@@ -380,24 +386,7 @@ void StfBuilderDevice::StfOutputThread()
380386
DDDLOG("Exiting StfOutputThread...");
381387
}
382388

383-
void StfBuilderDevice::InfoThread()
384-
{
385-
while (I().mState.mRunning) {
386-
387-
std::this_thread::sleep_for(2s);
388-
389-
if (I().mState.mPaused) {
390-
continue;
391-
}
392389

393-
IDDLOG("SubTimeFrame size_mean={} frequency_mean={:.4} sending_time_ms_mean={:.4} queued_stf={}",
394-
I().mStfSizeMean, (1.0 / I().mReadoutInterface->StfTimeMean()),
395-
I().mStfDataTimeSamples, I().mCounters.mNumStfs);
396-
IDDLOG("SubTimeFrame sent_total={} rate={:.4}", I().mSentOutStfsTotal, I().mSentOutRate);
397-
}
398-
399-
DDDLOG("Exiting Info thread...");
400-
}
401390

402391
bool StfBuilderDevice::ConditionalRun()
403392
{

src/StfBuilder/StfBuilderDevice.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <ConcurrentQueue.h>
2424
#include <Utilities.h>
2525
#include <FmqUtilities.h>
26+
#include <DataDistMonitoring.h>
2627

2728
#include <deque>
2829
#include <memory>
@@ -124,8 +125,12 @@ class StfBuilderDevice : public DataDistDevice
124125
I().mFileSink->makeDirectory();
125126
}
126127

128+
// enable monitoring
129+
DataDistMonitor::enable_datadist(DataDistLogger::sRunNumber, GetConfig()->GetProperty<std::string>("discovery-partition", "-"));
130+
127131
IDDLOG("Entering running state. RunNumber: {}", DataDistLogger::sRunNumberStr);
128132
}
133+
129134
virtual void PostRun() override final {
130135

131136
if (I().mReadoutInterface) {
@@ -138,6 +143,9 @@ class StfBuilderDevice : public DataDistDevice
138143
IDDLOG("Pausing file source.");
139144
}
140145

146+
// disable monitoring
147+
DataDistMonitor::disable_datadist();
148+
141149
IDDLOG("Exiting running state. RunNumber: {}", DataDistLogger::sRunNumberStr);
142150
}
143151

@@ -178,10 +186,6 @@ class StfBuilderDevice : public DataDistDevice
178186
/// File source
179187
std::unique_ptr<SubTimeFrameFileSource> mFileSource;
180188

181-
/// Info thread
182-
std::thread mInfoThread;
183-
uint64_t mStfSizeMean;
184-
double mStfDataTimeSamples;
185189
std::uint64_t mSentOutStfsTotal = 0;
186190
std::uint64_t mSentOutStfs = 0; // used to calculate the rate (pause/resume)
187191
double mSentOutRate = 0.0;

src/StfBuilder/StfBuilderInput.cxx

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -260,14 +260,12 @@ void StfInputInterface::StfBuilderThread()
260260
}
261261

262262
mSeqStfQueue.push(std::move(*lStf));
263-
{ // MON: data of a new STF received, get the freq and new start time
263+
{
264264
auto lNow = hres_clock::now();
265265
std::chrono::duration<double> lTimeDiff = lNow - lStartSec;
266266
lStartSec = lNow;
267-
mStfTimeMean += (lTimeDiff.count()/100.0 - mStfTimeMean/100.0);
267+
DDMON("stfbuilder", "stf_input.rate", (1.0 / lTimeDiff.count()));
268268
}
269-
} else {
270-
mStfTimeMean *= 2.0;
271269
}
272270
};
273271

@@ -418,9 +416,14 @@ void StfInputInterface::StfSequencerThread()
418416

419417
static constexpr std::uint64_t sMaxMissingStfsForSeq = 2ull * 11234 / 256; // 2 seconds of STFs
420418

419+
std::uint64_t lMissingStfs = 0;
420+
421421
while (mRunning) {
422422
auto lStf = mSeqStfQueue.pop_wait_for(500ms);
423423

424+
// monitoring cumulative metric
425+
DDMON("stfbuilder", "stf_input.missing.total", lMissingStfs);
426+
424427
if (lStf == std::nullopt || !mAcceptingData) {
425428
continue;
426429
}
@@ -456,6 +459,8 @@ void StfInputInterface::StfSequencerThread()
456459
auto lEmptyStf = std::make_unique<SubTimeFrame>(lStfIdIdx);
457460
(*lStf)->setOrigin(SubTimeFrame::Header::Origin::eNull);
458461
mDevice.I().queue(eStfBuilderOut, std::move(lEmptyStf));
462+
463+
lMissingStfs++;
459464
}
460465
} else {
461466
WDDLOG_RL(1000, "READOUT_INTERFACE: Large STF gap. previous_stf_id={} current_stf_id={} num_missing={}",
@@ -465,6 +470,7 @@ void StfInputInterface::StfSequencerThread()
465470
// insert the actual stf
466471
mLastSeqStfId = lCurrId;
467472
mDevice.I().queue(eStfBuilderOut, std::move(*lStf));
473+
468474
continue;
469475
}
470476

src/StfBuilder/StfBuilderInput.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ class StfInputInterface
5151
void StfBuilderThread();
5252
void StfSequencerThread();
5353

54-
double StfTimeMean() const { return mStfTimeMean; }
5554
private:
5655
/// Main SubTimeBuilder O2 device
5756
StfBuilderDevice &mDevice;
@@ -61,8 +60,6 @@ class StfInputInterface
6160
bool mAcceptingData = false;
6261
std::thread mInputThread;
6362

64-
double mStfTimeMean = 1.0;
65-
6663
/// StfBuilding thread and queues
6764
std::unique_ptr<ConcurrentFifo<std::vector<FairMQMessagePtr>>> mBuilderInputQueue = nullptr;
6865
std::unique_ptr<SubTimeFrameReadoutBuilder> mStfBuilder = nullptr;

src/StfBuilder/runStfBuilderDevice.cxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ int main(int argc, char* argv[])
4040

4141
// Add InfoLogger Options
4242
r.fConfig.AddToCmdLineOptions(impl::DataDistLoggerCtx::getProgramOptions());
43+
// Add Monitoring Options
44+
r.fConfig.AddToCmdLineOptions(DataDistMonitor::getProgramOptions());
4345

4446
bpo::options_description lStfBuilderOptions("StfBuilder options", 120);
4547

src/StfSender/StfSenderDevice.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ void StfSenderDevice::InitTask()
6666
I().mStandalone = GetConfig()->GetValue<bool>(OptionKeyStandalone);
6767

6868
// start monitoring
69-
DataDistMonitor::start_datadist(ProcessType::StfSender, GetConfig()->GetValue<std::string>("monitoring-backend"));
69+
DataDistMonitor::start_datadist(o2::monitoring::tags::Value::StfSender, GetConfig()->GetProperty<std::string>("monitoring-backend"));
7070
DataDistMonitor::set_rate(GetConfig()->GetValue<float>("monitoring-rate"));
7171
DataDistMonitor::set_log(GetConfig()->GetValue<bool>("monitoring-log"));
7272

@@ -284,7 +284,7 @@ void StfSenderDevice::StfReceiverThread()
284284
++lReceivedStfs;
285285
DDDLOG_RL(5000, "StfSender received total of {} STFs.", lReceivedStfs);
286286

287-
IDDLOG_RL(2000, "StfReceiverThread:: SubTimeFrame stf_id={} size={} unique_equip={}",
287+
DDDLOG_RL(2000, "StfReceiverThread:: SubTimeFrame stf_id={} size={} unique_equip={}",
288288
lStf->header().mId, lStf->getDataSize(), lStf->getEquipmentIdentifiers().size());
289289

290290
I().queue(eReceiverOut, std::move(lStf));

src/TfBuilder/TfBuilderDevice.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ void TfBuilderDevice::InitTask()
6262
mTfBufferSize <<= 20; /* input parameter is in MiB */
6363

6464
// start monitoring
65-
DataDistMonitor::start_datadist(ProcessType::TfBuilder, GetConfig()->GetValue<std::string>("monitoring-backend"));
65+
DataDistMonitor::start_datadist(o2::monitoring::tags::Value::TfBuilder, GetConfig()->GetProperty<std::string>("monitoring-backend"));
6666
DataDistMonitor::set_rate(GetConfig()->GetValue<float>("monitoring-rate"));
6767
DataDistMonitor::set_log(GetConfig()->GetValue<bool>("monitoring-log"));
6868

src/TfBuilder/TfBuilderDevice.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ class TfBuilderDevice : public DataDistDevice,
5757
public:
5858
static constexpr const char* OptionKeyStandalone = "stand-alone";
5959
static constexpr const char* OptionKeyTfMemorySize = "tf-memory-size";
60-
6160
static constexpr const char* OptionKeyDplChannelName = "dpl-channel-name";
6261

6362
/// Default constructor

0 commit comments

Comments
 (0)