Skip to content

Commit e5602e5

Browse files
committed
DPL: navigate a MessageSet without caching pairs
1 parent 87b5043 commit e5602e5

File tree

3 files changed

+127
-132
lines changed

3 files changed

+127
-132
lines changed

Framework/Core/include/Framework/MessageSet.h

Lines changed: 20 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@
1212
#define FRAMEWORK_MESSAGESET_H
1313

1414
#include "Framework/PartRef.h"
15+
#include <fairmq/Message.h>
16+
#include "Framework/DataModelViews.h"
1517
#include <memory>
1618
#include <vector>
1719
#include <cassert>
1820

19-
namespace o2
20-
{
21-
namespace framework
21+
namespace o2::framework
2222
{
2323

2424
/// A set of inflight messages.
@@ -31,41 +31,23 @@ namespace framework
3131
/// O2 message model. For this purpose, also the pair index is filled and can
3232
/// be used to access header and payload associated with a pair
3333
struct MessageSet {
34-
struct Index {
35-
Index(size_t p, size_t s) : position(p), size(s) {}
36-
size_t position = 0;
37-
size_t size = 0;
38-
};
3934
// linear storage of messages
4035
std::vector<fair::mq::MessagePtr> messages;
41-
// message map describes O2 messages consisting of a header message and
42-
// payload message(s), index describes position in the linear storage
43-
std::vector<Index> messageMap;
44-
// pair map describes all messages in one sequence of header-payload pairs and
45-
// where in the message index the associated header and payload can be found
46-
struct PairMapping {
47-
PairMapping(size_t partId, size_t payloadId) : partIndex(partId), payloadIndex(payloadId) {}
48-
// O2 message where the pair is located in
49-
size_t partIndex = 0;
50-
// payload index within the O2 message
51-
size_t payloadIndex = 0;
52-
};
53-
std::vector<PairMapping> pairMap;
5436

5537
MessageSet()
56-
: messages(), messageMap(), pairMap()
38+
: messages()
5739
{
5840
}
5941

6042
template <typename F>
6143
MessageSet(F getter, size_t size)
62-
: messages(), messageMap(), pairMap()
44+
: messages()
6345
{
6446
add(std::forward<F>(getter), size);
6547
}
6648

6749
MessageSet(MessageSet&& other)
68-
: messages(std::move(other.messages)), messageMap(std::move(other.messageMap)), pairMap(std::move(other.pairMap))
50+
: messages(std::move(other.messages))
6951
{
7052
other.clear();
7153
}
@@ -76,36 +58,32 @@ struct MessageSet {
7658
return *this;
7759
}
7860
messages = std::move(other.messages);
79-
messageMap = std::move(other.messageMap);
80-
pairMap = std::move(other.pairMap);
8161
other.clear();
8262
return *this;
8363
}
8464

8565
/// get number of in-flight O2 messages
86-
size_t size() const
66+
[[nodiscard]] size_t size() const
8767
{
88-
return messageMap.size();
68+
return messages | count_parts{};
8969
}
9070

9171
/// get number of header-payload pairs
92-
size_t getNumberOfPairs() const
72+
[[nodiscard]] size_t getNumberOfPairs() const
9373
{
94-
return pairMap.size();
74+
return messages | count_payloads{};
9575
}
9676

9777
/// get number of payloads for an in-flight message
98-
size_t getNumberOfPayloads(size_t mi) const
78+
[[nodiscard]] size_t getNumberOfPayloads(size_t mi) const
9979
{
100-
return messageMap[mi].size;
80+
return messages | get_num_payloads{mi};
10181
}
10282

10383
/// clear the set
10484
void clear()
10585
{
10686
messages.clear();
107-
messageMap.clear();
108-
pairMap.clear();
10987
}
11088

11189
// this is more or less legacy
@@ -122,8 +100,6 @@ struct MessageSet {
122100
// add content of the part ref
123101
void add(PartRef&& ref)
124102
{
125-
pairMap.emplace_back(messageMap.size(), 0);
126-
messageMap.emplace_back(messages.size(), 1);
127103
messages.emplace_back(std::move(ref.header));
128104
messages.emplace_back(std::move(ref.payload));
129105
}
@@ -132,53 +108,32 @@ struct MessageSet {
132108
template <typename F>
133109
void add(F getter, size_t size)
134110
{
135-
auto partid = messageMap.size();
136-
messageMap.emplace_back(messages.size(), size - 1);
137111
for (size_t i = 0; i < size; ++i) {
138-
if (i > 0) {
139-
pairMap.emplace_back(partid, i - 1);
140-
}
141112
messages.emplace_back(std::move(getter(i)));
142113
}
143114
}
144115

145116
fair::mq::MessagePtr& header(size_t partIndex)
146117
{
147-
return messages[messageMap[partIndex].position];
118+
return messages[(messages | get_dataref_indices{partIndex, 0}).headerIdx];
148119
}
149120

150121
fair::mq::MessagePtr& payload(size_t partIndex, size_t payloadIndex = 0)
151122
{
152-
assert(partIndex < messageMap.size());
153-
assert(messageMap[partIndex].position + payloadIndex + 1 < messages.size());
154-
return messages[messageMap[partIndex].position + payloadIndex + 1];
155-
}
156-
157-
fair::mq::MessagePtr const& header(size_t partIndex) const
158-
{
159-
return messages[messageMap[partIndex].position];
123+
return messages[(messages | get_dataref_indices{partIndex, payloadIndex}).payloadIdx];
160124
}
161125

162-
fair::mq::MessagePtr const& payload(size_t partIndex, size_t payloadIndex = 0) const
126+
[[nodiscard]] fair::mq::MessagePtr const& header(size_t partIndex) const
163127
{
164-
assert(partIndex < messageMap.size());
165-
assert(messageMap[partIndex].position + payloadIndex + 1 < messages.size());
166-
return messages[messageMap[partIndex].position + payloadIndex + 1];
128+
return messages[(messages | get_dataref_indices{partIndex, 0}).headerIdx];
167129
}
168130

169-
fair::mq::MessagePtr const& associatedHeader(size_t pos) const
131+
[[nodiscard]] fair::mq::MessagePtr const& payload(size_t partIndex, size_t payloadIndex = 0) const
170132
{
171-
return messages[messageMap[pairMap[pos].partIndex].position];
172-
}
173-
174-
fair::mq::MessagePtr const& associatedPayload(size_t pos) const
175-
{
176-
auto partIndex = pairMap[pos].partIndex;
177-
auto payloadIndex = pairMap[pos].payloadIndex;
178-
return messages[messageMap[partIndex].position + payloadIndex + 1];
133+
return messages[(messages | get_dataref_indices{partIndex, payloadIndex}).payloadIdx];
179134
}
180135
};
181136

182-
} // namespace framework
183-
} // namespace o2
137+
} // namespace o2::framework
138+
184139
#endif // FRAMEWORK_MESSAGESET_H

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2140,8 +2140,10 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
21402140
// sequence is the header message
21412141
// - each part has one or more payload messages
21422142
// - InputRecord provides all payloads as header-payload pair
2143-
auto const& headerMsg = currentSetOfInputs[i].associatedHeader(partindex);
2144-
auto const& payloadMsg = currentSetOfInputs[i].associatedPayload(partindex);
2143+
//
2144+
auto const& [headerMsgIndex, payloadMsgIndex] = currentSetOfInputs[i].messages | get_pair{partindex};
2145+
auto const& headerMsg = currentSetOfInputs[i].messages[headerMsgIndex];
2146+
auto const& payloadMsg = currentSetOfInputs[i].messages[payloadMsgIndex];
21452147
headerptr = static_cast<char const*>(headerMsg->GetData());
21462148
payloadptr = payloadMsg ? static_cast<char const*>(payloadMsg->GetData()) : nullptr;
21472149
payloadSize = payloadMsg ? payloadMsg->GetSize() : 0;

0 commit comments

Comments
 (0)