@@ -139,13 +139,10 @@ class RegionAllocatorResource
139139 pSize,
140140 pRegionFlags,
141141 [this ](const std::vector<FairMQRegionBlock>& pBlkVect) {
142-
143- static thread_local icl::interval_map<std::size_t , std::size_t > lIntMap;
142+ icl::interval_map<std::size_t , std::size_t > lIntMap;
144143 static thread_local double sMergeRatio = 0.5 ;
145144
146- std::int64_t lReclaimed = 0 ;
147-
148- lIntMap.clear ();
145+ std::uint64_t lReclaimed = 0 ;
149146
150147 for (const auto &lInt : pBlkVect) {
151148 if (lInt.size == 0 ) {
@@ -183,6 +180,7 @@ class RegionAllocatorResource
183180 }
184181
185182 mFree += lReclaimed;
183+ mGeneration += 1 ;
186184 }
187185
188186 // weighted average merge ratio
@@ -295,25 +293,34 @@ class RegionAllocatorResource
295293 auto lRet = try_alloc (pSize);
296294
297295 while (!lRet && mRunning ) {
296+ auto lGen = mGeneration .load ();
298297 // try to reclaim if possible
299298 if (try_reclaim (pSize)) {
300299 // try again
301300 lRet = try_alloc (pSize);
302301 }
303302
303+ if (lRet) {
304+ break ;
305+ }
306+
304307 if (mCanFail && !lRet) {
305308 WDDLOG_RL (1000 , " RegionAllocatorResource: Allocation failed. region={} alloc={} region_size={} free={}" ,
306309 mSegmentName , pSize, mRegion ->GetSize (), mFree );
307310 WDDLOG_RL (1000 , " Memory region '{}' is too small, or there is a large backpressure." , mSegmentName );
308311 return nullptr ;
309312 }
310313
311- if (!lRet ) {
314+ while ( true ) {
312315 using namespace std ::chrono_literals;
313316 WDDLOG_RL (1000 , " RegionAllocatorResource: waiting to allocate a message. region={} alloc={} region_size={} free={}" ,
314317 mSegmentName , pSize, mRegion ->GetSize (), mFree );
315318 WDDLOG_RL (1000 , " Memory region '{}' is too small, or there is a large backpressure." , mSegmentName );
316- std::this_thread::sleep_for (5ms);
319+ if (lGen != mGeneration .load ()) {
320+ break ; // retry alloc
321+ } else {
322+ std::this_thread::sleep_for (20ms);
323+ }
317324 }
318325 }
319326
@@ -451,6 +458,7 @@ class RegionAllocatorResource
451458
452459 // free space accounting
453460 std::atomic_int64_t mFree = 0 ;
461+ std::atomic_uint64_t mGeneration = 0 ; // bump when free is finished, so that we don't retry allocs
454462
455463 // two step reclaim to avoid lock contention in the allocation path
456464 std::mutex mReclaimLock ;
0 commit comments