Skip to content

Commit a0f84c9

Browse files
committed
monitoring: enable metrics from StfSender and TfBuilder
1 parent 02726bc commit a0f84c9

22 files changed

Lines changed: 551 additions & 93 deletions

cmake/FindAliceO2.cmake

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ find_library(AliceO2_LIBRARY_HEADERS NAMES O2Headers HINTS ${O2_ROOT}/lib ENV LD
2525
find_library(AliceO2_LIBRARY_FRAMEWORK NAMES O2Framework HINTS ${O2_ROOT}/lib ENV LD_LIBRARY_PATH)
2626

2727
set(AliceO2_LIBRARIES
28-
${AliceO2_LIBRARY_O2DEVICE}
2928
${AliceO2_LIBRARY_HEADERS}
3029
${AliceO2_LIBRARY_FRAMEWORK}
3130
)

script/start_Discovery-3FLP-3EPN.sh.in

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,11 +145,15 @@ STF_SENDER="StfSender"
145145
STF_SENDER+=" --discovery-net-if=$FLP_NETIF"
146146
STF_SENDER+=" --discovery-partition=$PARTITION"
147147
STF_SENDER+=" --mq-config $chainConfig"
148+
STF_SENDER+=" --monitoring-rate=5.0"
149+
STF_SENDER+=" --monitoring-log"
148150

149151
TF_BUILDER="TfBuilder"
150152
TF_BUILDER+=" --discovery-net-if=$EPN_NETIF"
151153
TF_BUILDER+=" --discovery-partition=$PARTITION"
152154
TF_BUILDER+=" --mq-config $chainConfig"
155+
TF_BUILDER+=" --monitoring-rate=1.0"
156+
TF_BUILDER+=" --monitoring-log"
153157

154158
if [[ ! -z $TF_BUILDER_DPL_CHAN ]]; then
155159
TF_BUILDER+=" --dpl-channel-name=$TF_BUILDER_DPL_CHAN"

src/StfSender/CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ set(EXE_STFS_SOURCES
99
)
1010

1111
add_library(StfSender_lib OBJECT ${EXE_STFS_SOURCES})
12-
target_link_libraries(StfSender_lib base fmqtools common discovery)
12+
target_link_libraries(StfSender_lib base fmqtools common discovery monitoring)
1313

1414
add_executable(StfSender)
1515

@@ -22,7 +22,7 @@ endif()
2222
target_link_libraries(StfSender
2323
PRIVATE
2424
StfSender_lib
25-
base fmqtools common discovery
25+
base fmqtools common discovery monitoring
2626
)
2727

2828
install(TARGETS StfSender RUNTIME DESTINATION bin)

src/StfSender/StfSenderDevice.cxx

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ void StfSenderDevice::InitTask()
6565
I().mInputChannelName = GetConfig()->GetValue<std::string>(OptionKeyInputChannelName);
6666
I().mStandalone = GetConfig()->GetValue<bool>(OptionKeyStandalone);
6767

68+
// start monitoring
69+
DataDistMonitor::start_datadist(ProcessType::StfSender, GetConfig()->GetValue<std::string>("monitoring-backend"));
70+
DataDistMonitor::set_rate(GetConfig()->GetValue<float>("monitoring-rate"));
71+
DataDistMonitor::set_log(GetConfig()->GetValue<bool>("monitoring-log"));
72+
6873
if (!standalone()) {
6974
// Discovery
7075
I().mDiscoveryConfig = std::make_shared<ConsulStfSender>(ProcessType::StfSender, Config::getEndpointOption(*GetConfig()));
@@ -77,11 +82,17 @@ void StfSenderDevice::InitTask()
7782

7883
// wait for "partition-id"
7984
while (!Config::getPartitionOption(*GetConfig())) {
80-
WDDLOG("TfBuilder waiting on 'discovery-partition' config parameter.");
85+
WDDLOG("StfSender waiting on 'discovery-partition' config parameter.");
8186
std::this_thread::sleep_for(1s);
8287
}
8388

84-
lStatus.mutable_partition()->set_partition_id(*Config::getPartitionOption(*GetConfig()));
89+
I().mPartitionId = Config::getPartitionOption(*GetConfig()).value_or("");
90+
if (I().mPartitionId.empty()) {
91+
WDDLOG("StfSender 'discovery-partition' parameter not set.");
92+
std::this_thread::sleep_for(1s); exit(-1);
93+
}
94+
95+
lStatus.mutable_partition()->set_partition_id(I().mPartitionId);
8596
I().mDiscoveryConfig->write();
8697
}
8798

@@ -144,11 +155,19 @@ void StfSenderDevice::PreRun()
144155
I().mDiscoveryConfig->write();
145156
}
146157

158+
// reset counters
159+
if (I().mOutputHandler) {
160+
I().mOutputHandler->resetCounters();
161+
}
162+
147163
// make directory for file sink
148164
if (I().mFileSink) {
149165
I().mFileSink->makeDirectory();
150166
}
151167

168+
// enable monitoring
169+
DataDistMonitor::enable_datadist(DataDistLogger::sRunNumber, I().mPartitionId);
170+
152171
// start accepting data
153172
I().mAcceptingData = true;
154173

@@ -160,13 +179,21 @@ void StfSenderDevice::PostRun()
160179
// stop accepting data
161180
I().mAcceptingData = false;
162181

182+
// disable monitoring
183+
DataDistMonitor::disable_datadist();
184+
163185
// update running state
164186
if (!standalone() && I().mDiscoveryConfig) {
165187
auto& lStatus = I().mDiscoveryConfig->status();
166188
lStatus.mutable_info()->set_process_state(BasicInfo::NOT_RUNNING);
167189
I().mDiscoveryConfig->write();
168190
}
169191

192+
// reset counters
193+
if (I().mOutputHandler) {
194+
I().mOutputHandler->resetCounters();
195+
}
196+
170197
IDDLOG("Exiting running state. RunNumber: {}", DataDistLogger::sRunNumberStr);
171198
}
172199

@@ -204,6 +231,9 @@ void StfSenderDevice::ResetTask()
204231
I().mTfSchedulerRpcClient.stop();
205232
}
206233

234+
// stop monitoring
235+
DataDistMonitor::stop_datadist();
236+
207237
DDDLOG("ResetTask() done.");
208238
}
209239

@@ -218,7 +248,7 @@ void StfSenderDevice::StfReceiverThread()
218248
DplToStfAdapter lStfReceiver;
219249
std::unique_ptr<SubTimeFrame> lStf;
220250

221-
auto lStfStartTime = hres_clock::now();
251+
decltype(hres_clock::now()) lStfStartTime = hres_clock::now();
222252

223253
while (running()) {
224254
try {
@@ -235,18 +265,20 @@ void StfSenderDevice::StfReceiverThread()
235265
if (lStf) {
236266
WDDLOG_RL(1000, "StfSender: received STF but not in the running state.");
237267
}
238-
std::this_thread::sleep_for(10ms);
268+
DDMON("stfsender", "stf_input.rate", 0.0);
269+
DDMON("stfsender", "stf_input.size", 0.0);
270+
std::this_thread::sleep_for(20ms);
239271
continue;
240272
}
241273

242274
{ // Input STF frequency
243275
const auto lNow = hres_clock::now();
244276
const auto lStfDur = std::chrono::duration<double>(lNow - lStfStartTime);
245277
lStfStartTime = lNow;
246-
I().mStfTimeMean += (lStfDur.count()/100.0 - I().mStfTimeMean/100.0);
247278

248-
// get data size
249-
I().mStfSizeMean += (lStf->getDataSize()/128 - I().mStfSizeMean/128);
279+
DDMON("stfsender", "stf_input.rate", (1.0 / lStfDur.count()));
280+
DDMON("stfsender", "stf_input.size", lStf->getDataSize());
281+
DDMON("stfsender", "stf_input.id", (uint64_t)lStf->header().mId);
250282
}
251283

252284
++lReceivedStfs;
@@ -265,11 +297,10 @@ void StfSenderDevice::StfReceiverThread()
265297
void StfSenderDevice::InfoThread()
266298
{
267299
while (running()) {
268-
IDDLOG("StfSender: SubTimeFrame size_mean={} in_frequency_mean={:.4}", I().mStfSizeMean, (1.0 / I().mStfTimeMean));
269300
if (!standalone()) {
270301
const auto lCounters = I().mOutputHandler->getCounters();
271302

272-
IDDLOG("StfSender: SubTimeFrame queued_stf_num={} queued_stf_size={} sending_stf_num={} sending_stf_size={} ",
303+
DDDLOG("StfSender: SubTimeFrame queued_stf_num={} queued_stf_size={} sending_stf_num={} sending_stf_size={} ",
273304
lCounters.mBuffered.mCnt, lCounters.mBuffered.mSize,
274305
lCounters.mInSending.mCnt, lCounters.mInSending.mSize);
275306
}

src/StfSender/StfSenderDevice.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <SubTimeFrameFileSink.h>
2424
#include <Utilities.h>
2525
#include <FmqUtilities.h>
26+
#include <DataDistMonitoring.h>
2627

2728
#include <thread>
2829
#include <vector>
@@ -87,11 +88,12 @@ class StfSenderDevice : public DataDistDevice
8788
/// Configuration
8889
std::string mInputChannelName;
8990
bool mStandalone = false;
91+
std::string mPartitionId;
9092

9193
/// Discovery configuration
9294
std::shared_ptr<ConsulStfSender> mDiscoveryConfig;
9395

94-
/// Scheculer RPC client
96+
/// Scheduler RPC client
9597
TfSchedulerRpcClient mTfSchedulerRpcClient;
9698

9799
/// Receiver threads
@@ -110,8 +112,6 @@ class StfSenderDevice : public DataDistDevice
110112

111113
/// Info thread
112114
std::thread mInfoThread;
113-
std::uint64_t mStfSizeMean = 0;
114-
double mStfTimeMean = 50.0;
115115

116116
unsigned getNextPipelineStage(unsigned pStage) final
117117
{

src/StfSender/StfSenderOutput.cxx

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ void StfSenderOutput::StfSchedulerThread()
250250
auto [it, ins] = mScheduledStfMap.try_emplace(lStfId, std::move(lStf));
251251
if (!ins) {
252252
(void)it;
253-
EDDLOG_RL(500, "StfSchedulerThread: Stf already scheduled! Skipping the duplicate. stf_id={}", lStfId);
253+
EDDLOG_RL(500, "StfSchedulerThread: Stf is already scheduled! Skipping the duplicate. stf_id={}", lStfId);
254254
continue;
255255
}
256256

@@ -303,15 +303,10 @@ void StfSenderOutput::sendStfToTfBuilder(const std::uint64_t pStfId, const std::
303303
} else {
304304
pRes.set_status(StfDataResponse::DATA_DROPPED_SCHEDULER);
305305
}
306-
return;
307-
}
308-
309-
// check if it is drop request from the scheduler
310-
if (pTfBuilderId == "-1") {
306+
} else if (pTfBuilderId == "-1") { // check if it is drop request from the scheduler
311307
pRes.set_status(StfDataResponse::DATA_DROPPED_SCHEDULER);
312308
mDropQueue.push(std::move(lStfIter->second));
313309
mScheduledStfMap.erase(lStfIter);
314-
return;
315310
} else {
316311
auto lTfBuilderIter = mOutputMap.find(pTfBuilderId);
317312
if (lTfBuilderIter == mOutputMap.end()) {
@@ -326,12 +321,25 @@ void StfSenderOutput::sendStfToTfBuilder(const std::uint64_t pStfId, const std::
326321
// we clean the buffer when data is sent
327322
pRes.set_status(StfDataResponse::OK);
328323

329-
mCounters.mInSending.mSize += lStfIter->second->getDataSize();
324+
const auto lStfSize = lStfIter->second->getDataSize();
325+
const auto lStfId = lStfIter->second->id();
326+
mCounters.mInSending.mSize += lStfSize;
330327
mCounters.mInSending.mCnt += 1;
331328

332329
auto lStfNode = mScheduledStfMap.extract(lStfIter);
333330
lTfBuilderIter->second.mStfQueue->push(std::move(lStfNode.mapped()));
334331

332+
// monitoring
333+
{
334+
using hres_clock = std::chrono::high_resolution_clock;
335+
static decltype(hres_clock::now()) sStfStartTime = hres_clock::now();
336+
const auto lDuration = std::chrono::duration<double>(hres_clock::now() - sStfStartTime);
337+
DDMON("stfsender", "stf_output.stf_id", lStfId);
338+
DDMON("stfsender", "stf_output.stf_rate", (1.0 / lDuration.count()));
339+
DDMON("stfsender", "stf_output.stf_size", lStfSize);
340+
sStfStartTime = hres_clock::now();
341+
}
342+
335343
if (lTfBuilderIter->second.mStfQueue->size() > 50) {
336344
WDDLOG_RL(1000, "SendToTfBuilder: STF queue backlog. queue_size={}", lTfBuilderIter->second.mStfQueue->size());
337345
}
@@ -390,11 +398,18 @@ void StfSenderOutput::DataHandlerThread(const std::string pTfBuilderId)
390398
mCounters.mBuffered.mCnt -= 1;
391399
mCounters.mInSending.mSize -= lStfSize;
392400
mCounters.mInSending.mCnt -= 1;
401+
mCounters.mTotalSent.mSize += lStfSize;
402+
mCounters.mTotalSent.mCnt += 1;
393403

394404
if (mCounters.mInSending.mCnt > 100) {
395405
DDDLOG_RL(2000, "DataHandlerThread: Number of buffered STFs. tfb_id={} num_stfs={} num_stf_total={} size_stf_total={}",
396406
pTfBuilderId, lInputStfQueue->size(), mCounters.mInSending.mCnt, mCounters.mInSending.mSize);
397407
}
408+
409+
DDMON("stfsender", "stf_output.sent_count", mCounters.mTotalSent.mCnt);
410+
DDMON("stfsender", "stf_output.sent_size", mCounters.mTotalSent.mSize);
411+
DDMON("stfsender", "buffered.stf_cnt", mCounters.mBuffered.mCnt);
412+
DDMON("stfsender", "buffered.stf_size", mCounters.mBuffered.mSize);
398413
}
399414
}
400415

@@ -426,6 +441,9 @@ void StfSenderOutput::StfDropThread()
426441
std::scoped_lock lLock(mScheduledStfMapLock);
427442
mCounters.mBuffered.mSize -= lStfSize;
428443
mCounters.mBuffered.mCnt -= 1;
444+
445+
DDMON("stfsender", "buffered.stf_size", mCounters.mBuffered.mSize);
446+
DDMON("stfsender", "buffered.stf_cnt", mCounters.mBuffered.mCnt);
429447
}
430448
}
431449

src/StfSender/StfSenderOutput.h

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ class StfSenderOutput
4444
std::uint64_t mSize = 0;
4545
std::uint32_t mCnt = 0;
4646
} mInSending;
47+
48+
// total sent
49+
struct alignas(128) {
50+
std::uint64_t mSize = 0;
51+
std::uint64_t mCnt = 0;
52+
} mTotalSent;
4753
};
4854

4955
StfSenderOutput() = delete;
@@ -74,6 +80,13 @@ class StfSenderOutput
7480
return mCounters;
7581
}
7682

83+
StdSenderOutputCounters resetCounters() {
84+
std::scoped_lock lLock(mScheduledStfMapLock);
85+
StdSenderOutputCounters lRet = mCounters;
86+
mCounters = StdSenderOutputCounters();
87+
return lRet;
88+
}
89+
7790
private:
7891
/// Ref to the main SubTimeBuilder O2 device
7992
StfSenderDevice& mDevice;
@@ -100,7 +113,7 @@ class StfSenderOutput
100113
mutable std::mutex mOutputMapLock;
101114
std::map<std::string, OutputChannelObjects> mOutputMap;
102115

103-
// Buffer mainenance
116+
// Buffer maintenance
104117
std::uint64_t mBufferSize = std::uint64_t(32) << 30;
105118
ConcurrentFifo<std::unique_ptr<SubTimeFrame>> mDropQueue;
106119
std::thread mStfDropThread;

src/StfSender/runStfSenderDevice.cxx

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <SubTimeFrameFileSink.h>
1414
#include <Config.h>
1515
#include <FmqUtilities.h>
16+
#include <DataDistMonitoring.h>
1617

1718
#include <options/FairMQProgOptions.h>
1819

@@ -36,6 +37,8 @@ int main(int argc, char* argv[])
3637

3738
// Add InfoLogger Options
3839
r.fConfig.AddToCmdLineOptions(impl::DataDistLoggerCtx::getProgramOptions());
40+
// Add Monitoring Options
41+
r.fConfig.AddToCmdLineOptions(DataDistMonitor::getProgramOptions());
3942

4043
// StfSender options
4144
bpo::options_description lStfSenderOptions("StfSender options", 120);
@@ -57,7 +60,6 @@ int main(int argc, char* argv[])
5760

5861
// Add options for STF file sink
5962
r.fConfig.AddToCmdLineOptions(o2::DataDistribution::SubTimeFrameFileSink::getProgramOptions());
60-
6163
});
6264

6365
runner.AddHook<InstantiateDevice>([](DeviceRunner& r){

src/TfBuilder/CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ set(EXE_TFB_SOURCES
99
)
1010

1111
add_library(TfBuilder_lib OBJECT ${EXE_TFB_SOURCES})
12-
target_link_libraries(TfBuilder_lib base fmqtools common discovery)
12+
target_link_libraries(TfBuilder_lib base fmqtools common discovery monitoring)
1313

1414
add_executable(TfBuilder)
1515

@@ -22,7 +22,7 @@ endif()
2222
target_link_libraries(TfBuilder
2323
PRIVATE
2424
TfBuilder_lib
25-
base fmqtools common discovery
25+
base fmqtools common discovery monitoring
2626
)
2727

2828
install(TARGETS TfBuilder RUNTIME DESTINATION bin)

0 commit comments

Comments
 (0)