Skip to content

Commit 3198a29

Browse files
committed
emu: make the TFID dependent on system time to allow testing on cluster without external synchronization
1 parent 6cab745 commit 3198a29

File tree

3 files changed

+72
-51
lines changed

3 files changed

+72
-51
lines changed

src/ReadoutEmulator/CruEmulator.cxx

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,13 @@ namespace DataDistribution
2727

2828
void CruLinkEmulator::linkReadoutThread()
2929
{
30-
static const size_t cHBFrameFreq = 11223;
31-
static const size_t cStfPerS = 43; /* Parametrize this? */
30+
static const std::uint64_t cHBFrameFreq = 11223;
3231

3332
const auto cSuperpageSize = mMemHandler->getSuperpageSize();
3433
const auto cHBFrameSize = (mLinkBitsPerS / cHBFrameFreq) >> 3;
35-
const auto cStfLinkSize = cHBFrameSize * cHBFrameFreq / cStfPerS;
34+
const auto cStfLinkSize = cHBFrameSize * cHBFrameFreq * 256 / cHBFrameFreq;
3635
const auto cNumDmaChunkPerSuperpage = std::min(size_t(256), size_t(cSuperpageSize / mDmaChunkSize));
37-
constexpr int64_t cStfTimeUs = std::chrono::microseconds(1000000 / cStfPerS).count();
36+
constexpr int64_t cStfTimeUs = std::chrono::microseconds(std::uint64_t(1000000) * 256 / cHBFrameFreq).count();
3837

3938
LOG(DEBUG) << "Superpage size: " << cSuperpageSize;
4039
LOG(DEBUG) << "mDmaChunkSize size: " << mDmaChunkSize;
@@ -57,9 +56,7 @@ void CruLinkEmulator::linkReadoutThread()
5756
const int64_t lStfToSend = lUsSinceStart / cStfTimeUs - lSentStf;
5857

5958
if (lStfToSend <= 0) {
60-
// std::this_thread::sleep_for(std::chrono::microseconds());
6159
usleep(500);
62-
// std::this_thread::yield();
6360
continue;
6461
}
6562

src/ReadoutEmulator/ReadoutDevice.cxx

Lines changed: 67 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ void ReadoutDevice::PreRun()
8989
mGui = std::make_unique<RootGui>("Emulator", "Readout Emulator", 500, 500);
9090
mGuiThread = std::thread(&ReadoutDevice::GuiThread, this);
9191
}
92+
93+
mSendingThread = std::thread(&ReadoutDevice::SendingThread, this);
9294
}
9395

9496
void ReadoutDevice::ResetTask()
@@ -101,65 +103,85 @@ void ReadoutDevice::ResetTask()
101103
if (mBuildHistograms && mGuiThread.joinable()) {
102104
mGuiThread.join();
103105
}
106+
107+
if (mSendingThread.joinable()) {
108+
mSendingThread.join();
109+
}
104110
}
105111

106112
bool ReadoutDevice::ConditionalRun()
107113
{
114+
using namespace std::chrono_literals;
115+
std::this_thread::sleep_for(1s);
116+
return true;
117+
}
118+
119+
void ReadoutDevice::SendingThread()
120+
{
121+
122+
WaitForRunningState();
123+
108124
auto& lOutputChan = GetChannel(mOutChannelName, 0);
109125

110126
// finish an STF every ~1/45 seconds
111127
static const auto cDataTakingStart = std::chrono::high_resolution_clock::now();
112-
static constexpr auto cStfInterval = std::chrono::microseconds(22222);
128+
static constexpr auto cStfInterval = std::chrono::microseconds(22810);
113129
static uint64_t lNumberSentStfs = 0;
130+
static uint64_t lCurrentTfId = 0;
114131

115-
auto isStfFinished =
116-
(std::chrono::high_resolution_clock::now() - cDataTakingStart) - (lNumberSentStfs * cStfInterval) > cStfInterval;
117-
118-
if (isStfFinished)
119-
lNumberSentStfs += 1;
120-
121-
ReadoutLinkO2Data lCruLinkData;
122-
if (!mCruMemoryHandler->getLinkData(lCruLinkData)) {
123-
LOG(INFO) << "GetLinkData failed. Stopping interface thread.";
124-
return false;
125-
}
126-
127-
if (mBuildHistograms)
128-
mFreeSuperpagesSamples.Fill(mCruMemoryHandler->free_superpages());
129-
130-
// check no data signal
131-
if (lCruLinkData.mLinkDataHeader.subSpecification ==
132-
o2::header::DataHeader::SubSpecificationType(-1)) {
133-
// LOG(WARN) << "No Superpages left! Losing data...";
134-
return true;
135-
}
136-
137-
ReadoutSubTimeframeHeader lHBFHeader;
138-
lHBFHeader.mTimeFrameId = lNumberSentStfs;
139-
lHBFHeader.mNumberHbf = lCruLinkData.mLinkRawData.size();
140-
lHBFHeader.mLinkId = lCruLinkData.mLinkDataHeader.subSpecification;
141-
142-
assert(mDataBlockMsgs.empty());
143-
mDataBlockMsgs.reserve(lCruLinkData.mLinkRawData.size());
144-
145-
// create messages for the header
146-
mDataBlockMsgs.emplace_back(lOutputChan.NewMessage(sizeof(ReadoutSubTimeframeHeader)));
147-
std::memcpy(mDataBlockMsgs.front()->GetData(), &lHBFHeader, sizeof(ReadoutSubTimeframeHeader));
148-
149-
// create messages for the data
150-
for (const auto& lDmaChunk : lCruLinkData.mLinkRawData) {
151-
// mark this as used in the memory handler
152-
mCruMemoryHandler->get_data_buffer(lDmaChunk.mDataPtr, lDmaChunk.mDataSize);
132+
while (IsRunningState()) {
153133

154-
// create a message out of unmanaged region
155-
mDataBlockMsgs.emplace_back(lOutputChan.NewMessage(mDataRegion, lDmaChunk.mDataPtr, lDmaChunk.mDataSize));
134+
auto isStfFinished =
135+
(std::chrono::high_resolution_clock::now() - cDataTakingStart) - (lNumberSentStfs * cStfInterval) > cStfInterval;
136+
137+
if (isStfFinished) {
138+
lNumberSentStfs += 1;
139+
lCurrentTfId = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::high_resolution_clock::now().time_since_epoch()).count() / cStfInterval.count();
140+
}
141+
142+
ReadoutLinkO2Data lCruLinkData;
143+
if (!mCruMemoryHandler->getLinkData(lCruLinkData)) {
144+
LOG(INFO) << "GetLinkData failed. Stopping interface thread.";
145+
return;
146+
}
147+
148+
if (mBuildHistograms) {
149+
mFreeSuperpagesSamples.Fill(mCruMemoryHandler->free_superpages());
150+
}
151+
152+
// check no data signal
153+
if (lCruLinkData.mLinkDataHeader.subSpecification ==
154+
o2::header::DataHeader::SubSpecificationType(-1)) {
155+
// LOG(WARN) << "No Superpages left! Losing data...";
156+
}
157+
158+
ReadoutSubTimeframeHeader lHBFHeader;
159+
lHBFHeader.mTimeFrameId = lCurrentTfId;
160+
lHBFHeader.mNumberHbf = lCruLinkData.mLinkRawData.size();
161+
lHBFHeader.mLinkId = lCruLinkData.mLinkDataHeader.subSpecification;
162+
163+
assert(mDataBlockMsgs.empty());
164+
mDataBlockMsgs.reserve(lCruLinkData.mLinkRawData.size());
165+
166+
// create messages for the header
167+
mDataBlockMsgs.emplace_back(lOutputChan.NewMessage(sizeof(ReadoutSubTimeframeHeader)));
168+
std::memcpy(mDataBlockMsgs.front()->GetData(), &lHBFHeader, sizeof(ReadoutSubTimeframeHeader));
169+
170+
// create messages for the data
171+
for (const auto& lDmaChunk : lCruLinkData.mLinkRawData) {
172+
// mark this as used in the memory handler
173+
mCruMemoryHandler->get_data_buffer(lDmaChunk.mDataPtr, lDmaChunk.mDataSize);
174+
175+
// create a message out of unmanaged region
176+
mDataBlockMsgs.emplace_back(lOutputChan.NewMessage(mDataRegion, lDmaChunk.mDataPtr, lDmaChunk.mDataSize));
177+
}
178+
179+
lOutputChan.Send(mDataBlockMsgs);
180+
mDataBlockMsgs.clear();
156181
}
182+
}
157183

158-
lOutputChan.Send(mDataBlockMsgs);
159-
mDataBlockMsgs.clear();
160184

161-
return true;
162-
}
163185

164186
void ReadoutDevice::GuiThread()
165187
{

src/ReadoutEmulator/ReadoutDevice.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class ReadoutDevice : public DataDistDevice
5858

5959

6060
void GuiThread();
61+
void SendingThread();
6162

6263
// data and Descriptor regions
6364
// must be here because NewUnmanagedRegionFor() is a method of FairMQDevice...
@@ -79,6 +80,7 @@ class ReadoutDevice : public DataDistDevice
7980

8081
// messages to send
8182
std::vector<FairMQMessagePtr> mDataBlockMsgs;
83+
std::thread mSendingThread;
8284

8385
/// Observables
8486
bool mBuildHistograms = true;

0 commit comments

Comments
 (0)