Skip to content

Commit 94f1396

Browse files
committed
tfbuilder: synchronize on memory allocations
1 parent 55ed8cf commit 94f1396

3 files changed

Lines changed: 32 additions & 3 deletions

File tree

src/TfBuilder/TfBuilderDevice.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ TfBuilderDevice::~TfBuilderDevice()
4646

4747
void TfBuilderDevice::Init()
4848
{
49-
mMemI = std::make_unique<MemoryResources>(this->AddTransport(fair::mq::Transport::SHM));
49+
mMemI = std::make_unique<SyncMemoryResources>(this->AddTransport(fair::mq::Transport::SHM));
5050
}
5151

5252
void TfBuilderDevice::Reset()

src/TfBuilder/TfBuilderDevice.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ class TfBuilderDevice : public DataDistDevice,
122122
bool dplEnabled() const noexcept { return mDplEnabled; }
123123

124124
/// Memory region singletons
125-
std::unique_ptr<MemoryResources> mMemI;
125+
std::unique_ptr<SyncMemoryResources> mMemI;
126126

127127
/// Configuration
128128
std::string mDplChannelName;

src/common/include/MemoryUtils.h

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,7 @@ class MemoryResources {
477477
explicit MemoryResources(std::shared_ptr<FairMQTransportFactory> pShmTransport)
478478
: mShmTransport(pShmTransport) { }
479479

480-
~MemoryResources() {
480+
virtual ~MemoryResources() {
481481
// make sure to delete regions before dropping the transport
482482
mHeaderMemRes.reset();
483483
mDataMemRes.reset();
@@ -522,6 +522,35 @@ class MemoryResources {
522522
bool mRunning = true;
523523
};
524524

525+
526+
class SyncMemoryResources : public MemoryResources {
527+
public:
528+
SyncMemoryResources() = delete;
529+
explicit SyncMemoryResources(std::shared_ptr<FairMQTransportFactory> pShmTransport)
530+
: MemoryResources(pShmTransport) { }
531+
532+
virtual ~SyncMemoryResources() {}
533+
534+
inline
535+
FairMQMessagePtr newHeaderMessage(const std::size_t pSize) {
536+
assert(mHeaderMemRes);
537+
std::scoped_lock lock(mHdrLock);
538+
return mHeaderMemRes->NewFairMQMessage(pSize);
539+
}
540+
541+
inline
542+
FairMQMessagePtr newDataMessage(const std::size_t pSize) {
543+
assert(mDataMemRes);
544+
std::scoped_lock lock(mDataLock);
545+
return mDataMemRes->NewFairMQMessage(pSize);
546+
}
547+
548+
549+
private:
550+
std::mutex mHdrLock;
551+
std::mutex mDataLock;
552+
};
553+
525554
}
526555
} /* o2::DataDistribution */
527556

0 commit comments

Comments
 (0)