Skip to content

Commit 5be8dfe

Browse files
committed
topology: option to stop aggregating on new orbit
1 parent 69aab85 commit 5be8dfe

File tree

6 files changed

+122
-49
lines changed

6 files changed

+122
-49
lines changed

src/common/ReadoutDataModel.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ struct RDHReaderIf {
6262
virtual std::uint32_t getMemorySize(const char* data) const = 0;
6363
virtual std::uint32_t getOffsetToNext(const char* data) const = 0;
6464
virtual bool getStopBit(const char* data) const = 0;
65+
virtual std::uint16_t getPageCounter(const char* data) const = 0;
6566
// trigger
6667
virtual std::uint32_t getOrbit(const char* data) const = 0;
6768
virtual std::uint16_t getBC(const char* data) const = 0;
@@ -145,6 +146,12 @@ class RDHReaderImpl final : public RDHReaderIf {
145146
return lRdh.stop;
146147
}
147148

149+
virtual inline
150+
std::uint16_t getPageCounter(const char* data) const override final {
151+
const RDH &lRdh = getHdrRef(data);
152+
return lRdh.pageCnt;
153+
}
154+
148155
// RDH trigger information
149156
virtual inline
150157
std::uint32_t getOrbit(const char* data) const override final {
@@ -318,6 +325,9 @@ class RDHReader {
318325
inline
319326
bool getStopBit() const { return I().getStopBit(mData); }
320327

328+
inline
329+
std::uint16_t getPageCounter() const { return I().getPageCounter(mData); }
330+
321331
// RDH trigger information
322332
inline
323333
std::uint32_t getOrbit() const { return I().getOrbit(mData); };

src/common/SubTimeFrameBuilder.cxx

Lines changed: 80 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ bool SubTimeFrameReadoutBuilder::addHbFrames(
8383
const auto R = RDHReader(pHbFramesBegin[0]);
8484
mStf->updateFirstOrbit(R.getOrbit());
8585
} catch (...) {
86-
EDDLOG("Error getting RDHReader instace. Not using {} HBFs", pHBFrameLen);
86+
EDDLOG("Error getting RDHReader instance. Not using {} HBFs", pHBFrameLen);
8787
return false;
8888
}
8989
}
@@ -230,17 +230,32 @@ std::optional<std::unique_ptr<SubTimeFrame>> SubTimeFrameReadoutBuilder::addTopo
230230
const o2::header::DataOrigin &pDataOrig,
231231
const o2::header::DataHeader::SubSpecificationType pSubSpecification,
232232
const ReadoutSubTimeframeHeader& pHdr,
233-
std::vector<FairMQMessagePtr>::iterator pHbFramesBegin, const std::size_t pHBFrameLen,
234-
const std::uint64_t pMaxNumMessages)
233+
std::vector<FairMQMessagePtr>::iterator &pHbFramesBegin, std::size_t &pHBFrameLen,
234+
const std::uint64_t pMaxNumMessages, bool pCutTfOnNewOrbit)
235235
{
236236
static uint32_t sTfId = 1;
237237

238+
auto isFirstPacketOfOrbit = [&](const FairMQMessagePtr &pMsg) -> bool {
239+
if (!pMsg) {
240+
return false;
241+
}
242+
try {
243+
const auto R = RDHReader(pMsg);
244+
if (R.getPageCounter() == 0) {
245+
return true;
246+
}
247+
} catch (...) {
248+
EDDLOG_RL(5000, "Error getting RDHReader instance. page_size={}", pMsg->GetSize());
249+
}
250+
return false;
251+
};
252+
238253
if (!mRunning) {
239254
WDDLOG("Adding HBFrames while STFBuilder is not running!");
240255
return std::nullopt;
241256
}
242257

243-
if (!mAcceptStfData) {
258+
if (!mAcceptStfData || (pHBFrameLen == 0)) {
244259
return std::nullopt;
245260
}
246261

@@ -259,60 +274,80 @@ std::optional<std::unique_ptr<SubTimeFrame>> SubTimeFrameReadoutBuilder::addTopo
259274
}
260275
}
261276

262-
DataHeader lDataHdr(
263-
o2::header::gDataDescriptionRawData,
264-
pDataOrig,
265-
pSubSpecification,
266-
0 /* Update later */
267-
);
268-
lDataHdr.payloadSerializationMethod = gSerializationMethodNone;
277+
auto lReturnStfIfFinished = [&](const auto &pHbFrame) -> std::optional<std::unique_ptr<o2::DataDistribution::SubTimeFrame> > {
278+
if (!lStf || (lStfNumMessages == 0)) {
279+
return std::nullopt;
280+
}
281+
// before starting, check if the existing STF is large enough and cut conditions are met
282+
if ((!pCutTfOnNewOrbit && (lStfNumMessages >= pMaxNumMessages)) || // orbit cut not used
283+
(pCutTfOnNewOrbit && (lStfNumMessages >= pMaxNumMessages) && isFirstPacketOfOrbit(pHbFrame))) { // orbit cut used
269284

270-
const o2hdr::DataIdentifier lDataId(o2::header::gDataDescriptionRawData.str, pDataOrig.str);
285+
lStf->setOrigin(SubTimeFrame::Header::Origin::eReadoutTopology);
271286

272-
bool lIncludeO2Hdr = true;
273-
for (size_t i = 0; i < pHBFrameLen; i++) {
274-
// we need at least one header per equipment
275-
if (lIncludeO2Hdr) {
276-
lIncludeO2Hdr = false; // only provide one header message
287+
DDDLOG_RL(1000, "addTopoStfData: leaving and returning STF: numMessages={}", lStfNumMessages);
288+
lStfNumMessages = 0;
277289

278-
lDataHdr.payloadSize = pHbFramesBegin[i]->GetSize();
290+
mAcceptStfData = true;
291+
return std::optional<std::unique_ptr<SubTimeFrame>>(std::move(lStf));
292+
}
279293

280-
auto lDplHdr = o2::framework::DataProcessingHeader{lStf->header().mId};
281-
lDplHdr.creation = lStf->header().mCreationTimeMs;
282-
auto lStack = Stack(
283-
lDataHdr,
284-
lDplHdr
285-
);
294+
return std::nullopt;
295+
};
286296

287-
auto lHdrMsg = mMemRes.newHeaderMessage(lStack.data(), lStack.size());
288-
if (!lHdrMsg) {
289-
WDDLOG_RL(1000, "Allocation error: dropping data of the current STF stf_id={}", pHdr.mRunNumber);
290-
// clear data of the partial STF
291-
mAcceptStfData = false;
292-
lStf->clear();
297+
// check if we should cut at the first block
298+
{
299+
auto lDoneStfOpt = lReturnStfIfFinished(*pHbFramesBegin);
300+
if (lDoneStfOpt.has_value()) {
301+
return lDoneStfOpt;
302+
}
303+
}
293304

294-
return std::nullopt;
295-
}
305+
const o2hdr::DataIdentifier lDataId(o2::header::gDataDescriptionRawData.str, pDataOrig.str);
296306

297-
lStf->addStfDataReadout(lDataId, pSubSpecification, SubTimeFrame::StfData{std::move(lHdrMsg), std::move(pHbFramesBegin[i])});
298-
} else {
299-
lStf->addStfDataReadout(lDataId, pSubSpecification, SubTimeFrame::StfData{nullptr, std::move(pHbFramesBegin[i])});
307+
if (!lStf->stfDataExists(lDataId, pSubSpecification)) {
308+
DataHeader lDataHdr(
309+
o2::header::gDataDescriptionRawData,
310+
pDataOrig,
311+
pSubSpecification,
312+
0 /* Update later */
313+
);
314+
lDataHdr.payloadSerializationMethod = gSerializationMethodNone;
315+
lDataHdr.payloadSize = (*pHbFramesBegin)->GetSize();
316+
317+
auto lDplHdr = o2::framework::DataProcessingHeader{lStf->header().mId};
318+
lDplHdr.creation = lStf->header().mCreationTimeMs;
319+
320+
auto lStack = Stack(lDataHdr, lDplHdr);
321+
322+
auto lHdrMsg = mMemRes.newHeaderMessage(lStack.data(), lStack.size());
323+
if (!lHdrMsg) {
324+
WDDLOG_RL(10000, "Allocation error: dropping data of the current STF stf_id={}", pHdr.mRunNumber);
325+
// clear data of the partial STF
326+
mAcceptStfData = false;
327+
lStf->clear();
328+
return std::nullopt;
300329
}
301-
}
302330

303-
// update number of messages per topo stf
304-
lStfNumMessages += pHBFrameLen;
331+
lStf->addStfDataReadout(lDataId, pSubSpecification, SubTimeFrame::StfData{std::move(lHdrMsg), std::move(*pHbFramesBegin)});
332+
lStfNumMessages += 1;
333+
pHBFrameLen -= 1;
334+
pHbFramesBegin += 1;
335+
}
305336

306-
if (lStfNumMessages >= pMaxNumMessages) {
307-
lStf->setOrigin(SubTimeFrame::Header::Origin::eReadoutTopology);
308-
std::optional<std::unique_ptr<SubTimeFrame>> lRetStf = std::move(lStf);
337+
while (pHBFrameLen > 0) {
338+
// check if we should cut at the next
339+
auto lDoneStfOpt = lReturnStfIfFinished(*pHbFramesBegin);
340+
if (lDoneStfOpt.has_value()) {
341+
return lDoneStfOpt;
342+
}
309343

310-
DDDLOG_RL(1000, "addTopoStfData: leaving and returning STF: numMessages={}", lStfNumMessages);
311-
lStfNumMessages = 0;
312-
return lRetStf;
344+
lStf->addStfDataReadout(lDataId, pSubSpecification, SubTimeFrame::StfData{nullptr, std::move(*pHbFramesBegin)});
345+
lStfNumMessages += 1;
346+
pHBFrameLen -= 1;
347+
pHbFramesBegin += 1;
313348
}
314349

315-
DDDLOG_RL(1000, "addTopoStfData: leaving without returning STF: numMessages={}", lStfNumMessages);
350+
DDDLOG_RL(10000, "addTopoStfData: leaving without returning STF: numMessages={}", lStfNumMessages);
316351

317352
return std::nullopt;
318353
}

src/common/SubTimeFrameBuilder.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ class SubTimeFrameReadoutBuilder
4848
std::optional<std::unique_ptr<SubTimeFrame>> addTopoStfData(const o2::header::DataOrigin &pDataOrig,
4949
const o2::header::DataHeader::SubSpecificationType pSubSpecification,
5050
const ReadoutSubTimeframeHeader& pHdr,
51-
std::vector<FairMQMessagePtr>::iterator pHbFramesBegin, const std::size_t pHBFrameLen,
52-
const std::uint64_t pMaxNumMessages);
51+
std::vector<FairMQMessagePtr>::iterator &pHbFramesBegin, std::size_t &pHBFrameLen,
52+
const std::uint64_t pMaxNumMessages, bool pCutTfOnNewOrbit);
5353

5454
std::optional<std::uint32_t> getCurrentStfId() const {
5555
return (mStf) ? std::optional<std::uint32_t>(mStf->header().mId) : std::nullopt;
@@ -71,17 +71,20 @@ class SubTimeFrameReadoutBuilder
7171
return (lStf) ? std::optional<std::unique_ptr<SubTimeFrame>>(std::move(lStf)) : std::nullopt;
7272
}
7373

74-
// fo aggregation of threshold scan data
74+
// for aggregation of threshold scan data
7575
std::optional<std::unique_ptr<SubTimeFrame>> getTopoStf() {
7676
if (mTopoStfMap.empty()) {
7777
return std::nullopt;
7878
}
7979

8080
const auto lBegin = mTopoStfMap.begin();
8181
std::unique_ptr<SubTimeFrame> lStf = std::move(lBegin->second.second);
82-
8382
mTopoStfMap.erase(lBegin);
8483

84+
if (lStf) {
85+
lStf->setOrigin(SubTimeFrame::Header::Origin::eReadoutTopology);
86+
}
87+
8588
mAcceptStfData = true;
8689
return (lStf) ? std::optional<std::unique_ptr<SubTimeFrame>>(std::move(lStf)) : std::nullopt;
8790
}

src/common/SubTimeFrameDataModel.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,20 @@ class SubTimeFrame : public IDataModelObject
390390
/// helper methods
391391
///
392392

393+
inline bool stfDataExists(const o2hdr::DataIdentifier &pDataId, const o2::header::DataHeader::SubSpecificationType pSubSpec)
394+
{
395+
const auto lDataIt = mData.find(pDataId);
396+
if (lDataIt == mData.end()) {
397+
return false;
398+
}
399+
400+
if (lDataIt->second.count(pSubSpec) > 0) {
401+
return true;
402+
}
403+
404+
return false;
405+
}
406+
393407
// This is only to be used with the data building from readout
394408
// in this case, only a single split-payload will be present in the data vector
395409
// If any other header is provided, it can be discarded

src/common/base/Utilities.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@
2828
namespace o2::DataDistribution
2929
{
3030

31+
static inline constexpr
32+
void assume(const bool pPred) {
33+
if (!pPred) {
34+
__builtin_unreachable();
35+
}
36+
}
37+
3138
template <class F, class ... Args>
3239
std::thread create_thread_member(const char* name, F&& f, Args&&... args) {
3340
char *lName = strdup(name);

src/common/discovery/DataDistributionOptions.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ static constexpr std::string_view DataDistNetworkTransportDefault = "ucx";
3636
// Page aggregation for topological runs. Larger number of pages decreases FLP-EPN interaction rate (better performance)
3737
static constexpr std::string_view NumPagesInTopologicalStfKey = "NumPagesInTopologicalStf";
3838
static constexpr std::uint64_t NumPagesInTopologicalStfDefault = 128;
39+
// Topological STFs: Force starting STFs on new Orbit only
40+
static constexpr std::string_view StartTopologicalStfOnNewOrbitfKey = "StartTopologicalStfOnNewOrbit";
41+
static constexpr bool StartTopologicalStfOnNewOrbitDefault = true;
42+
3943

4044

4145
////////////////////////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)