1717
1818#include < boost/container/pmr/memory_resource.hpp>
1919#include < boost/filesystem.hpp>
20+ #include < boost/icl/interval_map.hpp>
21+ #include < boost/icl/right_open_interval.hpp>
2022
2123#include < fairmq/FairMQDevice.h>
2224#include < fairmq/FairMQChannel.h>
3941#include < sys/resource.h>
4042#endif
4143
44+ namespace icl = boost::icl;
45+
4246class DataHeader ;
4347class FairMQUnmanagedRegion ;
4448
@@ -51,7 +55,6 @@ static constexpr const char *ENV_NOLOCK = "DATADIST_NO_MLOCK";
5155static constexpr const char *ENV_SHM_PATH = " DATADIST_SHM_PATH" ;
5256static constexpr const char *ENV_SHM_DELAY = " DATADIST_SHM_DELAY" ;
5357
54-
5558template <size_t ALIGN = 64 >
5659class RegionAllocatorResource
5760{
@@ -135,50 +138,49 @@ class RegionAllocatorResource
135138 pRegionFlags,
136139 [this ](const std::vector<FairMQRegionBlock>& pBlkVect) {
137140
138- static thread_local std::vector<FairMQRegionBlock> lSortedBlks ;
141+ static thread_local icl::interval_map<std:: size_t , std:: size_t > lIntMap ;
139142 static thread_local double sMergeRatio = 0.5 ;
140143
141- lSortedBlks = pBlkVect;
142-
143- // sort and try to merge regions later
144- std::sort (lSortedBlks.begin (), lSortedBlks.end (), [](FairMQRegionBlock &a, FairMQRegionBlock &b) {
145- return a.ptr < b.ptr ;
146- });
147-
148- // merge the blocks
149- std::uint64_t lMergedBlocks = 0 ;
150- for (std::size_t i = 0 ; i < lSortedBlks.size () - 1 ; i++ ) {
151- for (std::size_t j = i+1 ; j < lSortedBlks.size (); j++ ) {
152- if (reinterpret_cast <char *>(lSortedBlks[i].ptr ) + align_size_up (lSortedBlks[i].size ) == lSortedBlks[j].ptr ) {
153- lSortedBlks[i].size = align_size_up (lSortedBlks[i].size ) + align_size_up (lSortedBlks[j].size );
154- lSortedBlks[j].ptr = nullptr ;
155- lMergedBlocks++;
156- } else {
157- i = j;
158- break ;
159- }
144+ std::int64_t lReclaimed = 0 ;
145+
146+ lIntMap.clear ();
147+
148+ for (const auto &lInt : pBlkVect) {
149+ if (lInt.size == 0 ) {
150+ continue ;
160151 }
152+
153+ lIntMap += std::make_pair (
154+ icl::discrete_interval<std::size_t >::right_open (
155+ std::size_t (lInt.ptr ) , std::size_t (lInt.ptr ) + lInt.size ), std::size_t (1 ));
161156 }
162157
163- // callback to be called when message buffers no longer needed by transports
164- std::scoped_lock lock (mReclaimLock );
158+ {
159+ // callback to be called when message buffers no longer needed by transports
160+ std::scoped_lock lock (mReclaimLock );
165161
166- std::int64_t lReclaimed = 0 ;
167- for (const auto &lBlk : lSortedBlks) {
162+ for (const auto &lIntMerged : lIntMap) {
163+ if (lIntMerged.second > 1 ) {
164+ EDDLOG (" CreateUnmanagedRegion reclaim BUG! Multiple overlapping intervals:" );
165+ for (const auto &i : lIntMap) {
166+ EDDLOG (" - [{},{}) : count={}" , i.first .lower (), i.first .upper (), i.second );
167+ }
168168
169- if (!lBlk.ptr ) { // merged
170- continue ;
171- }
169+ continue ; // skip the overlapping thing
170+ }
171+
172+ const auto lLen = lIntMerged.first .upper () - lIntMerged.first .lower ();
173+ lReclaimed += lLen;
172174
173- // we can merge with alignied sizes
174- lReclaimed += align_size_up (lBlk.size );
175- reclaimSHMMessage (lBlk.ptr , lBlk.size );
175+ reclaimSHMMessage ((void *) lIntMerged.first .lower (), lLen);
176+ }
176177 }
177178
178- mFree . fetch_add (lReclaimed, std::memory_order_relaxed) ;
179+ mFree += lReclaimed ;
179180
180181 // weighted average merge ratio
181- sMergeRatio = sMergeRatio * 0.75 + double (lMergedBlocks) / double (pBlkVect.size ()) * 0.25 ;
182+ sMergeRatio = sMergeRatio * 0.75 + double (pBlkVect.size () - lIntMap.iterative_size ()) /
183+ double (pBlkVect.size ()) * 0.25 ;
182184 DDLOGF_RL (5000 , DataDistSeverity::debug, " Memory segment '{}'::block merging ratio average={:.4}" ,
183185 mSegmentName , sMergeRatio );
184186 },
@@ -195,11 +197,10 @@ class RegionAllocatorResource
195197 mStart = static_cast <char *>(mRegion ->GetData ());
196198 mSegmentSize = mRegion ->GetSize ();
197199 mLength = mRegion ->GetSize ();
200+ mFree = mSegmentSize ;
198201
199202 memset (mStart , 0xAA , mLength );
200203
201- mFree = mSegmentSize ;
202-
203204 // Insert delay for testing
204205 const auto lShmDelay = std::getenv (ENV_SHM_DELAY);
205206 if (lShmDelay) {
@@ -259,13 +260,6 @@ class RegionAllocatorResource
259260
260261 void * do_allocate (std::size_t pSize, std::size_t /* pAlign */ )
261262 {
262- #if !defined(NDEBUG)
263- static const std::thread::id d_sThisId = std::this_thread::get_id ();
264- if (d_sThisId != std::this_thread::get_id ()) {
265- DDLOGF_RL (1000 , DataDistSeverity::error, " Allocation from RegionAllocatorResource {} is not thread safe" ,
266- mSegmentName );
267- }
268- #endif
269263
270264 if (!mRunning ) {
271265 return nullptr ;
@@ -293,7 +287,7 @@ class RegionAllocatorResource
293287
294288 DDLOGF_RL (1000 , DataDistSeverity::warning,
295289 " RegionAllocatorResource: waiting to allocate a message. region={} alloc={} region_size={} free={} " ,
296- mSegmentName , pSize, mRegion ->GetSize (), mFree . load (std::memory_order_acquire) );
290+ mSegmentName , pSize, mRegion ->GetSize (), mFree );
297291 DDLOGF_RL (1000 , DataDistSeverity::warning, " Memory region '{}' is too small, or there is a large backpressure." ,
298292 mSegmentName );
299293
@@ -307,12 +301,11 @@ class RegionAllocatorResource
307301 return nullptr ;
308302 }
309303
310- mFree . fetch_sub (pSize, std::memory_order_relaxed) ;
304+ mFree -= pSize ;
311305
312- // Todo: find a better place for this reporting
313306 static std::size_t sLogRateLimit = 0 ;
314307 if (sLogRateLimit ++ % 1024 == 0 ) {
315- const auto lFree = mFree . load (std::memory_order_acquire) ;
308+ const std:: int64_t lFree = mFree ;
316309 DDLOGF_RL (2000 , DataDistSeverity::debug, " DataRegionResource {} memory free={} allocated={}" ,
317310 mSegmentName , lFree, (mSegmentSize - lFree));
318311 }
@@ -321,6 +314,7 @@ class RegionAllocatorResource
321314 }
322315
323316private:
317+ inline
324318 void * try_alloc (const std::size_t pSize) {
325319 if (mLength >= pSize) {
326320 const auto lObjectPtr = mStart ;
@@ -352,34 +346,46 @@ class RegionAllocatorResource
352346 mStart = nullptr ;
353347 mLength = 0 ;
354348
355- if (mFrees .empty ()) {
349+ if (mFreeRanges .empty ()) {
356350 return false ;
357351 }
358352
359- // find the largest free extent and return it if the size is adequate
360- auto lMaxIter = std::max_element (std::begin (mFrees ), std::end (mFrees ),
361- [](const auto & l, const auto & r) { return l.second < r.second ; });
353+ auto lMaxIter = std::max_element (std::begin (mFreeRanges ), std::end (mFreeRanges ),
354+ [](const auto & l, const auto & r) {
355+ return (l.first .upper () - l.first .lower ()) < (r.first .upper () - r.first .lower ());
356+ });
362357
363358 // check if the size is adequate
364- if (pSize > lMaxIter->second ) {
359+ const auto lFoudSize = lMaxIter->first .upper () - lMaxIter->first .lower ();
360+ if (pSize > lFoudSize) {
361+ return false ;
362+ }
363+
364+ if (lMaxIter->second > 1 ) {
365+ EDDLOG (" RegionAllocator BUG: Overlapping interval found: ptr={:p} length={} overlaps={}" ,
366+ reinterpret_cast <char *>(lMaxIter->first .lower ()), lFoudSize, lMaxIter->second );
367+
368+ // erase this segment
369+ mFree -= lFoudSize;
370+ mFreeRanges .erase (lMaxIter);
365371 return false ;
366372 }
367373
368374 // return the extent
369- mStart = const_cast <char *>(lMaxIter->first );
370- mLength = lMaxIter-> second ;
371- mFrees .erase (lMaxIter);
375+ mStart = reinterpret_cast <char *>(lMaxIter->first . lower () );
376+ mLength = lFoudSize ;
377+ mFreeRanges .erase (lMaxIter);
372378
373379 {
374380 // estimated fragmentation
375381 static thread_local double sFree = 0.0 ;
376382 static thread_local double sNumFragments = 0.0 ;
377383 static thread_local double sFragmentation = 0.0 ;
378384
379- const auto lFree = mFree . load (std::memory_order_acquire) ;
385+ const std:: size_t lFree = mFree ;
380386 sFree = sFree * 0.75 + double (lFree) * 0.25 ;
381387 sFragmentation = sFragmentation * 0.75 + double (lFree - mLength )/double (lFree) * 0.25 ;
382- sNumFragments = sNumFragments *0.75 + double (mFrees . size () + 1 ) * 0.25 ;
388+ sNumFragments = sNumFragments * 0.75 + double (mFreeRanges . iterative_size () + 1 ) * 0.25 ;
383389
384390 DDLOGF_RL (5000 , DataDistSeverity::debug, " DataRegionResource {} estimated: free={:.4} num_fragments={:.4} "
385391 " fragmentation={:.4}" , mSegmentName , sFree , sNumFragments , sFragmentation );
@@ -393,61 +399,19 @@ class RegionAllocatorResource
393399 // align up
394400 pSize = align_size_up (pSize);
395401
396- const char *lData = reinterpret_cast <const char *>(pData);
397-
398- // push object to the free map. Try to merge nodes
399- const auto lIter = mFrees .lower_bound (lData);
400- bool lInserted = false ;
402+ mFreeRanges += std::make_pair (icl::discrete_interval<std::size_t >::right_open (
403+ reinterpret_cast <std::size_t >(pData), reinterpret_cast <std::size_t >(pData) + pSize),
404+ std::size_t (1 )
405+ );
401406
402407#if !defined(NDEBUG)
403- if (lIter != mFrees .end ()) {
404- if (lIter->first <= lData) {
405- EDDLOG (" iter={:p}, data={:p}, Free map:" , lIter->first , lData);
406-
407- for (const auto &lit : mFrees ) {
408- EDDLOG (" iter={:p}, size={}" , lit.first , lit.second );
409- }
408+ for (const auto &lInt : mFreeRanges ) {
409+ if (lInt.second > 1 ) {
410+ EDDLOG (" RegionAllocator BUG: Overlapping interval found on reclaim: ptr={:p} length={} overlaps={}" ,
411+ reinterpret_cast <char *>(lInt.first .lower ()), lInt.first .upper () - lInt.first .lower (), lInt.second );
410412 }
411- assert (lIter->first > lData); // we cannot have this exact value in the free list
412413 }
413414#endif
414-
415- // check if we can merge with the previous
416- if (!mFrees .empty () && lIter != mFrees .begin ()) {
417- auto lPrev = std::prev (lIter);
418-
419- if ((lPrev->first + lPrev->second ) == lData) {
420- lPrev->second += pSize;
421- lInserted = true ;
422-
423- // check if we also can merge with the next (lIter)
424- if (lIter != mFrees .end ()) {
425- if ((lPrev->first + lPrev->second ) == lIter->first ) {
426- lPrev->second += lIter->second ;
427- mFrees .erase (lIter);
428- }
429- }
430- }
431- }
432-
433- if (!lInserted) {
434- // insert the new range
435- auto lIt = mFrees .emplace_hint (lIter, lData, pSize);
436-
437- if (lIt->second != pSize) {
438- EDDLOG (" BUG: RegionAllocatorResource: REPEATED INSERT!!! "
439- " {:p} : {}, original size: {}" , lIt->first , pSize, lIt->second );
440- }
441-
442- // check if we can merge with the next
443- auto lNextIt = std::next (lIt);
444- if (lNextIt != mFrees .cend ()) {
445- if ((lData + pSize) == lNextIt->first ) {
446- lIt->second += lNextIt->second ;
447- mFrees .erase (lNextIt);
448- }
449- }
450- }
451415 }
452416
453417 // / fields
@@ -461,12 +425,12 @@ class RegionAllocatorResource
461425 char *mStart = nullptr ;
462426 std::size_t mLength = 0 ;
463427
464- // two step reclaim to avoid lock contention in the allocation path
465- std::mutex mReclaimLock ;
466- std::map<const char *, std::size_t > mFrees ; // keep all returned blocks
467-
468428 // free space accounting
469429 std::atomic_int64_t mFree = 0 ;
430+
431+ // two step reclaim to avoid lock contention in the allocation path
432+ std::mutex mReclaimLock ;
433+ icl::interval_map<std::size_t , std::size_t > mFreeRanges ;
470434};
471435
472436
0 commit comments