@@ -202,27 +202,18 @@ void StfSenderOutput::StfKeepThread()
202202 std::default_random_engine lGen;
203203 std::uniform_int_distribution<unsigned > lUniformDist (0 , 99 );
204204
205- std::unique_ptr<SubTimeFrame> lStf;
206-
207205 mDeletePercentage = mDiscoveryConfig ->getUInt64Param (StandaloneStfDeleteChanceKey, StandaloneStfDeleteChanceDefault);
208206 mDeletePercentage = std::clamp (mDeletePercentage .load (), std::uint64_t (0 ), std::uint64_t (100 ));
209207
210- while ((lStf = mPipelineI .dequeue (eSenderIn)) != nullptr ) {
211- std::scoped_lock lMapLock (mStfKeepMapLock );
212-
213- const auto lStfId = lStf->id ();
214- const auto lStfSize = lStf->getDataSize ();
208+ while (mRunning ) {
209+ auto lStfOpt = mPipelineI .dequeue_for (eSenderIn, 50ms);
215210
216- if (mStfKeepMap .count (lStfId) == 0 ) {
217- mStfKeepMap [lStfId] = std::move (lStf);
218- }
211+ std::scoped_lock lMapLock (mStfKeepMapLock );
219212
220- // update buffer sizes
213+ // get buffer sizes
221214 StdSenderOutputCounters::Values lCounters;
222215 {
223216 std::scoped_lock lLock (mCounters .mCountersLock );
224- mCounters .mValues .mBuffered .mSize += lStfSize;
225- mCounters .mValues .mBuffered .mCnt += 1 ;
226217 lCounters = mCounters .mValues ;
227218 }
228219
@@ -253,6 +244,23 @@ void StfSenderOutput::StfKeepThread()
253244 mDropQueue .push (std::move (lIter->second ));
254245 mStfKeepMap .erase (lIter->first );
255246 }
247+
248+ if (lStfOpt) {
249+ std::unique_ptr<SubTimeFrame> lStf = std::move (lStfOpt.value ());
250+
251+ const auto lStfId = lStf->id ();
252+ const auto lStfSize = lStf->getDataSize ();
253+
254+ if (mStfKeepMap .count (lStfId) == 0 ) {
255+ mStfKeepMap [lStfId] = std::move (lStf);
256+ // update buffer sizes
257+ {
258+ std::scoped_lock lLock (mCounters .mCountersLock );
259+ mCounters .mValues .mBuffered .mSize += lStfSize;
260+ mCounters .mValues .mBuffered .mCnt += 1 ;
261+ }
262+ }
263+ }
256264 }
257265
258266 std::scoped_lock lMapLock (mStfKeepMapLock );
0 commit comments