From b294260c5f34a412d1385fec1d9408ad957fb7b1 Mon Sep 17 00:00:00 2001 From: Ian Henriksen Date: Wed, 22 Jul 2020 22:10:11 -0500 Subject: [PATCH 1/3] Fix assertion failure from erroneous unlock in Adaptive OBIM. --- libgalois/include/galois/worklists/AdaptiveObim.h | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/libgalois/include/galois/worklists/AdaptiveObim.h b/libgalois/include/galois/worklists/AdaptiveObim.h index e3ef9d5965..086433f700 100644 --- a/libgalois/include/galois/worklists/AdaptiveObim.h +++ b/libgalois/include/galois/worklists/AdaptiveObim.h @@ -296,8 +296,7 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { ((double)(p.slowPopsLastPeriod) / (double)(p.sinceLastFix)) > 1.0 / (double)(chunk_size)) { for (unsigned i = 1; i < runtime::activeThreads; ++i) { - while (current.getRemote(i)->lock.try_lock()) - ; + current.getRemote(i)->lock.lock(); } unsigned long priosCreatedThisPeriod = 0; unsigned long numPushesThisStep = 0; @@ -358,8 +357,7 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { delta = 0; for (unsigned i = 1; i < runtime::activeThreads; ++i) { - while (current.getRemote(i)->lock.try_lock()) - ; + current.getRemote(i)->lock.lock(); } for (unsigned i = 0; i < runtime::activeThreads; ++i) { @@ -473,8 +471,7 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { void push(const value_type& val) { ThreadData& p = *current.getLocal(); - while (!p.lock.try_lock()) - ; + std::lock_guard> lk{p.lock}; Index ind = indexer(val); deltaIndex index; index.k = ind; @@ -490,7 +487,6 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { // Fast path if (index == p.curIndex && p.current) { p.current->push(val); - p.lock.unlock(); return; } @@ -507,7 +503,6 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { p.current = lC; } lC->push(val); - p.lock.unlock(); } template @@ -526,8 +521,7 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { galois::optional pop() { ThreadData& p = *current.getLocal(); - while (!p.lock.try_lock()) - ; + p.lock.lock(); p.sinceLastFix++; From 8ad77b83277ff6f672d3f94b81692ad0295f26b4 Mon Sep 17 00:00:00 2001 From: Bozhi YOU Date: Sun, 26 Jul 2020 11:50:46 -0500 Subject: [PATCH 2/3] Fix lock and divided-by-zero bugs. Rename variables by the convention of OBIM. --- .../include/galois/worklists/AdaptiveObim.h | 232 +++++++++--------- 1 file changed, 119 insertions(+), 113 deletions(-) diff --git a/libgalois/include/galois/worklists/AdaptiveObim.h b/libgalois/include/galois/worklists/AdaptiveObim.h index 086433f700..4783a034d1 100644 --- a/libgalois/include/galois/worklists/AdaptiveObim.h +++ b/libgalois/include/galois/worklists/AdaptiveObim.h @@ -55,24 +55,22 @@ namespace worklists { * int operator()(Item i) const { return i.index; } * }; * - * typedef galois::WorkList::AdaptiveOrderedByIntegerMetric WL; + * typedef galois::worklists::AdaptiveOrderedByIntegerMetric WL; * galois::for_each(items.begin(), items.end(), Fn); * \endcode * - * @tparam Indexer Indexer class - * @tparam Container Scheduler for each bucket - * @tparam BlockPeriod Check for higher priority work every 2^BlockPeriod - * iterations - * @tparam BSP Use back-scan prevention - * @tparam uniformBSP Use uniform back-scan prevention - * @tparam T Work item type - * @tparam Index Indexer return type - * @tparam UseMonotonic Assume that an activity at priority p will not - * schedule work at priority p or any priority p1 - * where p1 < p. - * @tparam UseDescending Use descending order instead - * @tparam Concurrent Whether or not to allow concurrent execution - * + * @tparam Indexer Indexer class + * @tparam Container Scheduler for each bucket + * @tparam BlockPeriod Check for higher priority work every 2^BlockPeriod + * iterations + * @tparam BSP Use back-scan prevention + * @tparam uniformBSP Use uniform back-scan prevention + * @tparam T Work item type + * @tparam Index Indexer return type + * @tparam UseMonotonic Assume that an activity at priority p will not + * schedule work at priority p or any priority p1 where p1 < p. + * @tparam UseDescending Use descending order instead + * @tparam Concurrent Whether or not to allow concurrent execution */ template , typename Container = PerSocketChunkFIFO<>, int BlockPeriod = 0, @@ -80,27 +78,26 @@ template , typename T = int, typename Index = int, bool UseMonotonic = false, bool UseDescending = false, bool Concurrent = true> struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { - template - using rethread = AdaptiveOrderedByIntegerMetric< - Indexer, typename Container::template rethread, BlockPeriod, - BSP, uniformBSP, chunk_size, T, Index, UseMonotonic, UseDescending, - Concurrent_>; - - template + template using retype = AdaptiveOrderedByIntegerMetric< - Indexer, typename Container::template retype, BlockPeriod, BSP, - uniformBSP, chunk_size, T_, typename std::result_of::type, + Indexer, typename Container::template retype<_T>, BlockPeriod, BSP, + uniformBSP, chunk_size, _T, typename std::result_of::type, UseMonotonic, UseDescending, Concurrent>; - template + template + using rethread = AdaptiveOrderedByIntegerMetric< + Indexer, typename Container::template rethread<_b>, BlockPeriod, BSP, + uniformBSP, chunk_size, T, Index, UseMonotonic, UseDescending, _b>; + + template using with_block_period = - AdaptiveOrderedByIntegerMetric; - template + template using with_container = - AdaptiveOrderedByIntegerMetric; @@ -228,14 +225,20 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { unsigned int lastMasterVersion; unsigned int numPops; - unsigned int sinceLastFix; - unsigned int slowPopsLastPeriod; - unsigned int pushesLastPeriod; - unsigned int priosLastPeriod; - unsigned long pmodAllDeq; - unsigned int popsFromSameQ; + struct { + unsigned int popsLastFix; + unsigned int slowPopsLastPeriod; + unsigned int pushesLastPeriod; + unsigned int priosLastPeriod; + + unsigned int popsFromSameQ; + + inline double slowPopFreq() { + return ((double)slowPopsLastPeriod / (double)popsLastFix); + } + } counters; unsigned int ctr; Index maxPrioDiffLastPeriod; @@ -245,10 +248,9 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { ThreadData(Index initial) : curIndex(initial, 0), scanStart(initial, 0), current(0), - lastMasterVersion(0), numPops(0), sinceLastFix(0), - slowPopsLastPeriod(0), pushesLastPeriod(0), priosLastPeriod(0), - pmodAllDeq(0), popsFromSameQ(0), ctr(0), maxPrioDiffLastPeriod(0), - minPrio(std::numeric_limits::max()), + lastMasterVersion(0), numPops(0), + pmodAllDeq(0), counters{0, 0, 0, 0, 0}, ctr(0), + maxPrioDiffLastPeriod(0), minPrio(std::numeric_limits::max()), maxPrio(std::numeric_limits::min()) {} }; @@ -256,7 +258,7 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { // NB: Place dynamically growing masterLog after fixed-size PerThreadStorage // members to give higher likelihood of reclaiming PerThreadStorage - substrate::PerThreadStorage current; + substrate::PerThreadStorage data; substrate::PaddedLock masterLock; MasterLog masterLog; @@ -287,16 +289,16 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { GALOIS_ATTRIBUTE_NOINLINE galois::optional slowPop(ThreadData& p) { // Failed, find minimum bin - p.slowPopsLastPeriod++; + p.counters.slowPopsLastPeriod++; unsigned myID = galois::substrate::ThreadPool::getTID(); // first give it some time // then check the fdeq frequency - if (myID == 0 && p.sinceLastFix > counter && - ((double)(p.slowPopsLastPeriod) / (double)(p.sinceLastFix)) > - 1.0 / (double)(chunk_size)) { + if (myID == 0 && p.counters.popsLastFix > counter && + p.counters.slowPopFreq() > 1.0 / (double)(chunk_size)) { for (unsigned i = 1; i < runtime::activeThreads; ++i) { - current.getRemote(i)->lock.lock(); + while (!data.getRemote(i)->lock.try_lock()) + ; } unsigned long priosCreatedThisPeriod = 0; unsigned long numPushesThisStep = 0; @@ -304,26 +306,27 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { Index minOfMin = std::numeric_limits::max(); Index maxOfMax = std::numeric_limits::min(); for (unsigned i = 0; i < runtime::activeThreads; ++i) { - Index& otherMinPrio = current.getRemote(i)->minPrio; + Index& otherMinPrio = data.getRemote(i)->minPrio; minOfMin = compare(minOfMin, otherMinPrio) ? minOfMin : otherMinPrio; - Index& otherMaxPrio = current.getRemote(i)->maxPrio; + Index& otherMaxPrio = data.getRemote(i)->maxPrio; maxOfMax = compare(otherMaxPrio, maxOfMax) ? maxOfMax : otherMaxPrio; - priosCreatedThisPeriod += current.getRemote(i)->priosLastPeriod; - numPushesThisStep += current.getRemote(i)->pushesLastPeriod; - allPmodDeqCounts += current.getRemote(i)->pmodAllDeq; - current.getRemote(i)->sinceLastFix = 0; - current.getRemote(i)->slowPopsLastPeriod = 0; - current.getRemote(i)->pushesLastPeriod = 0; - current.getRemote(i)->priosLastPeriod = 0; - current.getRemote(i)->maxPrioDiffLastPeriod = 0; - - current.getRemote(i)->minPrio = std::numeric_limits::max(); - current.getRemote(i)->maxPrio = std::numeric_limits::min(); + priosCreatedThisPeriod += data.getRemote(i)->counters.priosLastPeriod; + numPushesThisStep += data.getRemote(i)->counters.pushesLastPeriod; + allPmodDeqCounts += data.getRemote(i)->pmodAllDeq; + data.getRemote(i)->counters.popsLastFix = 0; + data.getRemote(i)->counters.slowPopsLastPeriod = 0; + data.getRemote(i)->counters.pushesLastPeriod = 0; + data.getRemote(i)->counters.priosLastPeriod = 0; + data.getRemote(i)->maxPrioDiffLastPeriod = 0; + + data.getRemote(i)->minPrio = std::numeric_limits::max(); + data.getRemote(i)->maxPrio = std::numeric_limits::min(); } - if (((double)numPushesThisStep / + if ((double)numPushesThisStep && + ((double)numPushesThisStep / ((double)((maxOfMax >> delta) - (minOfMin >> delta)))) < - chunk_size / 2) { + chunk_size / 2) { double xx = ((double)(chunk_size) / ((double)numPushesThisStep / ((double)((maxOfMax >> delta) - (minOfMin >> delta))))); @@ -332,17 +335,17 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { } for (unsigned i = 1; i < runtime::activeThreads; ++i) { - current.getRemote(i)->lock.unlock(); + data.getRemote(i)->lock.unlock(); } } #ifdef UNMERGE_ENABLED // serif added here // make sure delta is bigger than 0 so that we can actually unmerge things // give it some time and check the same queue pops - else if (delta > 0 && myID == 0 && p.sinceLastFix > counter && - p.popsFromSameQ > 4 * chunk_size) { + else if (delta > 0 && myID == 0 && p.counters.popsLastFix > counter && + p.counters.popsFromSameQ > 4 * chunk_size) { if (((p.maxPrio >> delta) - (p.minPrio >> delta)) < 16 && - ((double)p.pushesLastPeriod / + ((double)p.counters.pushesLastPeriod / ((double)((p.maxPrio >> delta) - (p.minPrio >> delta)))) > 4 * chunk_size) { // this is a check to make sure we are also // pushing with the same frequency end of @@ -357,30 +360,31 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { delta = 0; for (unsigned i = 1; i < runtime::activeThreads; ++i) { - current.getRemote(i)->lock.lock(); + while (!data.getRemote(i)->lock.try_lock()) + ; } for (unsigned i = 0; i < runtime::activeThreads; ++i) { - current.getRemote(i)->sinceLastFix = 0; - current.getRemote(i)->slowPopsLastPeriod = 0; - current.getRemote(i)->pushesLastPeriod = 0; - current.getRemote(i)->priosLastPeriod = 0; - current.getRemote(i)->maxPrioDiffLastPeriod = 0; + data.getRemote(i)->counters.popsLastFix = 0; + data.getRemote(i)->counters.slowPopsLastPeriod = 0; + data.getRemote(i)->counters.pushesLastPeriod = 0; + data.getRemote(i)->counters.priosLastPeriod = 0; + data.getRemote(i)->maxPrioDiffLastPeriod = 0; - current.getRemote(i)->minPrio = std::numeric_limits::max(); - current.getRemote(i)->maxPrio = std::numeric_limits::min(); + data.getRemote(i)->minPrio = std::numeric_limits::max(); + data.getRemote(i)->maxPrio = std::numeric_limits::min(); } for (unsigned i = 1; i < runtime::activeThreads; ++i) { - current.getRemote(i)->lock.unlock(); + data.getRemote(i)->lock.unlock(); } p.ctr++; } - p.popsFromSameQ = 0; + p.counters.popsFromSameQ = 0; } #endif - p.popsFromSameQ = 0; + p.counters.popsFromSameQ = 0; updateLocal(p); bool localLeader = substrate::ThreadPool::isLeader(); @@ -392,24 +396,23 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { msS = p.scanStart; if (localLeader || uniformBSP) { for (unsigned i = 0; i < runtime::activeThreads; ++i) { - msS = std::min(msS, current.getRemote(i)->scanStart); + msS = std::min(msS, data.getRemote(i)->scanStart); } } else { msS = std::min( - msS, - current.getRemote(substrate::ThreadPool::getLeader())->scanStart); + msS, data.getRemote(substrate::ThreadPool::getLeader())->scanStart); } } - for (auto ii = p.local.lower_bound(msS), ee = p.local.end(); ii != ee; + for (auto ii = p.local.lower_bound(msS), ei = p.local.end(); ii != ei; ++ii) { - galois::optional retval; - if ((retval = ii->second->pop())) { + galois::optional item; + if ((item = ii->second->pop())) { p.current = ii->second; p.curIndex = ii->first; p.scanStart = ii->first; p.lock.unlock(); - return retval; + return item; } } p.lock.unlock(); @@ -433,7 +436,7 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { p.lastMasterVersion = masterVersion.load(std::memory_order_relaxed) + 1; masterLog.push_back(std::make_pair(i, lC2)); masterVersion.fetch_add(1); - p.priosLastPeriod++; + p.counters.priosLastPeriod++; } masterLock.unlock(); return lC2; @@ -451,13 +454,13 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { public: AdaptiveOrderedByIntegerMetric(const Indexer& x = Indexer()) - : current(earliest), heap(sizeof(CTy)), masterVersion(0), indexer(x) { + : data(earliest), heap(sizeof(CTy)), masterVersion(0), indexer(x) { delta = 0; counter = chunk_size; } ~AdaptiveOrderedByIntegerMetric() { - ThreadData& p = *current.getLocal(); + ThreadData& p = *data.getLocal(); updateLocal(p); // Deallocate in LIFO order to give opportunity for simple garbage // collection @@ -470,75 +473,77 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { } void push(const value_type& val) { - ThreadData& p = *current.getLocal(); - std::lock_guard> lk{p.lock}; - Index ind = indexer(val); deltaIndex index; - index.k = ind; - index.d = delta; + ThreadData& p = *data.getLocal(); + + while (!p.lock.try_lock()) + ; + Index ind = indexer(val); + index.k = ind; + index.d = delta; if (index.k > p.maxPrio) { p.maxPrio = index.k; } if (index.k < p.minPrio) { p.minPrio = index.k; } - p.pushesLastPeriod++; + p.counters.pushesLastPeriod++; // Fast path if (index == p.curIndex && p.current) { p.current->push(val); + p.lock.unlock(); return; } // Slow path - CTy* lC = updateLocalOrCreate(p, index); + CTy* C = updateLocalOrCreate(p, index); if (BSP && index < p.scanStart) p.scanStart = index; // Opportunistically move to higher priority work if (index < p.curIndex) { // we moved to a higher prio - p.popsFromSameQ = 0; + p.counters.popsFromSameQ = 0; p.curIndex = index; - p.current = lC; + p.current = C; } - lC->push(val); + C->push(val); + p.lock.unlock(); } template - unsigned int push(Iter b, Iter e) { - int npush; + size_t push(Iter b, Iter e) { + size_t npush; for (npush = 0; b != e; npush++) push(*b++); return npush; } template - unsigned int push_initial(const RangeTy& range) { + size_t push_initial(const RangeTy& range) { auto rp = range.local_pair(); return push(rp.first, rp.second); } galois::optional pop() { - ThreadData& p = *current.getLocal(); - p.lock.lock(); - - p.sinceLastFix++; + ThreadData& p = *data.getLocal(); + while (!p.lock.try_lock()) + ; + CTy* C = p.current; + p.counters.popsLastFix++; unsigned myID = galois::substrate::ThreadPool::getTID(); + data.getRemote(myID)->pmodAllDeq++; - current.getRemote(myID)->pmodAllDeq++; - - CTy* C = p.current; - if (BlockPeriod && - (BlockPeriod < 0 || ((p.numPops++ & ((1ull << BlockPeriod) - 1)) == 0))) + if (BlockPeriod && ((p.numPops++ & ((1ull << BlockPeriod) - 1)) == 0)) { return slowPop(p); - - galois::optional retval; - if (C && (retval = C->pop())) { - p.popsFromSameQ++; + } + galois::optional item; + if (C && (item = C->pop())) { + p.counters.popsFromSameQ++; p.lock.unlock(); - return retval; + return item; } // Slow path @@ -546,7 +551,8 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { } }; GALOIS_WLCOMPILECHECK(AdaptiveOrderedByIntegerMetric) -} // namespace worklists -} // namespace galois + +} // end namespace worklists +} // end namespace galois #endif From c9d964b9bf3ad993627de8060859f2f11817a9a5 Mon Sep 17 00:00:00 2001 From: Bozhi YOU Date: Mon, 27 Jul 2020 04:24:05 -0500 Subject: [PATCH 3/3] Fix init value of earliest; update code structure w.r.t OBIM --- .../include/galois/worklists/AdaptiveObim.h | 345 ++++++++++-------- 1 file changed, 190 insertions(+), 155 deletions(-) diff --git a/libgalois/include/galois/worklists/AdaptiveObim.h b/libgalois/include/galois/worklists/AdaptiveObim.h index 4783a034d1..6007ca43cd 100644 --- a/libgalois/include/galois/worklists/AdaptiveObim.h +++ b/libgalois/include/galois/worklists/AdaptiveObim.h @@ -21,8 +21,6 @@ #ifndef GALOIS_WORKLIST_ADAPTIVEOBIM_H #define GALOIS_WORKLIST_ADAPTIVEOBIM_H -// #define UNMERGE_ENABLED - #include #include #include @@ -30,6 +28,7 @@ #include #include "galois/config.h" + #include "galois/FlatMap.h" #include "galois/Timer.h" #include "galois/substrate/PaddedLock.h" @@ -40,6 +39,32 @@ namespace galois { namespace worklists { +namespace internal { + +template +struct AdaptiveOrderedByIntegerMetricComparator { + typedef std::less compare_t; + Index identity; + Index earliest; + + AdaptiveOrderedByIntegerMetricComparator() + : identity(std::numeric_limits::max()), + earliest(std::numeric_limits::min()) {} +}; + +template +struct AdaptiveOrderedByIntegerMetricComparator { + typedef std::greater compare_t; + Index identity; + Index earliest; + + AdaptiveOrderedByIntegerMetricComparator() + : identity(std::numeric_limits::min()), + earliest(std::numeric_limits::max()) {} +}; + +} // namespace internal + /** * Approximate priority scheduling. Indexer is a default-constructable class * whose instances conform to R r = indexer(item) where R is some @@ -66,7 +91,7 @@ namespace worklists { * @tparam BSP Use back-scan prevention * @tparam uniformBSP Use uniform back-scan prevention * @tparam T Work item type - * @tparam Index Indexer return type + * @tparam Index Indexer return type * @tparam UseMonotonic Assume that an activity at priority p will not * schedule work at priority p or any priority p1 where p1 < p. * @tparam UseDescending Use descending order instead @@ -75,70 +100,91 @@ namespace worklists { template , typename Container = PerSocketChunkFIFO<>, int BlockPeriod = 0, bool BSP = true, bool uniformBSP = false, int chunk_size = 64, - typename T = int, typename Index = int, bool UseMonotonic = false, - bool UseDescending = false, bool Concurrent = true> -struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { + typename T = int, typename Index = int, bool EnableUmerge = false, + bool UseMonotonic = false, bool UseDescending = false, + bool Concurrent = true> +struct AdaptiveOrderedByIntegerMetric + : private boost::noncopyable, + public internal::AdaptiveOrderedByIntegerMetricComparator { template using retype = AdaptiveOrderedByIntegerMetric< Indexer, typename Container::template retype<_T>, BlockPeriod, BSP, uniformBSP, chunk_size, _T, typename std::result_of::type, - UseMonotonic, UseDescending, Concurrent>; + EnableUmerge, UseMonotonic, UseDescending, Concurrent>; template using rethread = AdaptiveOrderedByIntegerMetric< Indexer, typename Container::template rethread<_b>, BlockPeriod, BSP, - uniformBSP, chunk_size, T, Index, UseMonotonic, UseDescending, _b>; + uniformBSP, chunk_size, T, Index, EnableUmerge, UseMonotonic, + UseDescending, _b>; template - using with_block_period = - AdaptiveOrderedByIntegerMetric; - + struct with_block_period { + typedef AdaptiveOrderedByIntegerMetric< + Indexer, Container, _period, BSP, uniformBSP, chunk_size, T, Index, + EnableUmerge, UseMonotonic, UseDescending, Concurrent> + type; + }; template - using with_container = - AdaptiveOrderedByIntegerMetric; - - template - using with_indexer = - AdaptiveOrderedByIntegerMetric; - - template - using with_back_scan_prevention = - AdaptiveOrderedByIntegerMetric; - - template - using with_monotonic = - AdaptiveOrderedByIntegerMetric; - - template - using with_descending = - AdaptiveOrderedByIntegerMetric; - - using compare_t = - std::conditional_t, std::less>; + struct with_container { + typedef AdaptiveOrderedByIntegerMetric< + Indexer, _container, BlockPeriod, BSP, uniformBSP, chunk_size, T, Index, + EnableUmerge, UseMonotonic, UseDescending, Concurrent> + type; + }; + + template + struct with_indexer { + AdaptiveOrderedByIntegerMetric< + _indexer, Container, BlockPeriod, BSP, uniformBSP, chunk_size, T, Index, + EnableUmerge, UseMonotonic, UseDescending, Concurrent> + type; + }; + + template + struct with_back_scan_prevention { + typedef AdaptiveOrderedByIntegerMetric< + Indexer, Container, BlockPeriod, _bsp, uniformBSP, chunk_size, T, Index, + EnableUmerge, UseMonotonic, UseDescending, Concurrent> + type; + }; + + template + struct with_unmerge { + AdaptiveOrderedByIntegerMetric< + Indexer, Container, BlockPeriod, BSP, uniformBSP, chunk_size, T, Index, + _enable_unmerge, UseMonotonic, UseDescending, Concurrent> + type; + }; + + template + struct with_monotonic { + AdaptiveOrderedByIntegerMetric< + Indexer, Container, BlockPeriod, BSP, uniformBSP, chunk_size, T, Index, + EnableUmerge, _use_monotonic, UseDescending, Concurrent> + type; + }; + + template + struct with_descending { + AdaptiveOrderedByIntegerMetric< + Indexer, Container, BlockPeriod, BSP, uniformBSP, chunk_size, T, Index, + EnableUmerge, UseMonotonic, _use_descending, Concurrent> + type; + }; typedef T value_type; typedef Index index_type; + typedef uint32_t delta_type; private: - static inline compare_t compare; - static constexpr Index earliest = UseDescending - ? std::numeric_limits::min() - : std::numeric_limits::max(); - // static constexpr identity = UseDescending ? - // std::numeric_limits::max() : std::numeric_limits::min(); - unsigned int delta; + typedef typename Container::template rethread CTy; + typedef internal::AdaptiveOrderedByIntegerMetricComparator + Comparator; + static inline typename Comparator::compare_t compare; + delta_type delta; unsigned int counter; unsigned int maxIndex; unsigned int lastSizeMasterLog; @@ -147,13 +193,12 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { // smaller delta insertions are prioritirized struct deltaIndex { Index k; // note: original index is stored here - unsigned int d; + delta_type d; // taking the max of deltas and doing right shift eq. shifting priority with // d-max(d1, d2) deltaIndex() : k(0), d(0) {} - deltaIndex(Index k1, unsigned int d1) : k(k1), d(d1) {} - unsigned int id() { return k; } + deltaIndex(Index k1, delta_type d1) : k(k1), d(d1) {} bool operator==(const deltaIndex& a) const { unsigned delt = std::max(d, a.d); Index a1 = k >> delt; @@ -214,7 +259,6 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { } }; - typedef typename Container::template rethread CTy; typedef galois::flat_map LMapTy; struct ThreadData { @@ -225,32 +269,43 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { unsigned int lastMasterVersion; unsigned int numPops; - unsigned long pmodAllDeq; - + unsigned int popsLastFix; + unsigned int slowPopsLastPeriod; + unsigned int pushesLastPeriod; + unsigned int popsFromSameQ; struct { - unsigned int popsLastFix; - unsigned int slowPopsLastPeriod; - unsigned int pushesLastPeriod; + size_t pmodAllDeq; unsigned int priosLastPeriod; - - unsigned int popsFromSameQ; - - inline double slowPopFreq() { - return ((double)slowPopsLastPeriod / (double)popsLastFix); - } - } counters; - unsigned int ctr; - - Index maxPrioDiffLastPeriod; + unsigned int numUmerges; + Index maxPrioDiffLastPeriod; + } stats; Index minPrio; Index maxPrio; substrate::PaddedLock lock; + void cleanup() { + popsLastFix = 0; + slowPopsLastPeriod = 0; + pushesLastPeriod = 0; + + stats.priosLastPeriod = 0; + stats.maxPrioDiffLastPeriod = 0; + + minPrio = std::numeric_limits::max(); + maxPrio = std::numeric_limits::min(); + } + + inline bool isSlowPopFreq(double threshold) { + // return ((double)slowPopsLastPeriod / (double)popsLastFix) > threshold; + return ((double)slowPopsLastPeriod > (double)popsLastFix) * threshold; + } + ThreadData(Index initial) : curIndex(initial, 0), scanStart(initial, 0), current(0), - lastMasterVersion(0), numPops(0), - pmodAllDeq(0), counters{0, 0, 0, 0, 0}, ctr(0), - maxPrioDiffLastPeriod(0), minPrio(std::numeric_limits::max()), + lastMasterVersion(0), numPops(0), popsLastFix(0), + slowPopsLastPeriod(0), pushesLastPeriod(0), + popsFromSameQ(0), stats{0, 0, 0, 0}, + minPrio(std::numeric_limits::max()), maxPrio(std::numeric_limits::min()) {} }; @@ -289,63 +344,57 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { GALOIS_ATTRIBUTE_NOINLINE galois::optional slowPop(ThreadData& p) { // Failed, find minimum bin - p.counters.slowPopsLastPeriod++; + p.slowPopsLastPeriod++; unsigned myID = galois::substrate::ThreadPool::getTID(); // first give it some time // then check the fdeq frequency - if (myID == 0 && p.counters.popsLastFix > counter && - p.counters.slowPopFreq() > 1.0 / (double)(chunk_size)) { + if (myID == 0 && p.popsLastFix > counter && + p.isSlowPopFreq(1.0 / (double)(chunk_size))) { + unsigned long numPushesThisStep = p.pushesLastPeriod; + unsigned long priosCreatedThisPeriod = p.stats.priosLastPeriod; + unsigned long allPmodDeqCounts = p.stats.pmodAllDeq; + Index minOfMin = p.minPrio; + Index maxOfMax = p.maxPrio; + p.cleanup(); for (unsigned i = 1; i < runtime::activeThreads; ++i) { while (!data.getRemote(i)->lock.try_lock()) ; - } - unsigned long priosCreatedThisPeriod = 0; - unsigned long numPushesThisStep = 0; - unsigned long allPmodDeqCounts = 0; - Index minOfMin = std::numeric_limits::max(); - Index maxOfMax = std::numeric_limits::min(); - for (unsigned i = 0; i < runtime::activeThreads; ++i) { + Index& otherMinPrio = data.getRemote(i)->minPrio; - minOfMin = compare(minOfMin, otherMinPrio) ? minOfMin : otherMinPrio; + minOfMin = std::min(minOfMin, otherMinPrio, compare); Index& otherMaxPrio = data.getRemote(i)->maxPrio; - maxOfMax = compare(otherMaxPrio, maxOfMax) ? maxOfMax : otherMaxPrio; - priosCreatedThisPeriod += data.getRemote(i)->counters.priosLastPeriod; - numPushesThisStep += data.getRemote(i)->counters.pushesLastPeriod; - allPmodDeqCounts += data.getRemote(i)->pmodAllDeq; - data.getRemote(i)->counters.popsLastFix = 0; - data.getRemote(i)->counters.slowPopsLastPeriod = 0; - data.getRemote(i)->counters.pushesLastPeriod = 0; - data.getRemote(i)->counters.priosLastPeriod = 0; - data.getRemote(i)->maxPrioDiffLastPeriod = 0; - - data.getRemote(i)->minPrio = std::numeric_limits::max(); - data.getRemote(i)->maxPrio = std::numeric_limits::min(); - } + maxOfMax = std::max(otherMaxPrio, maxOfMax, compare); + numPushesThisStep += data.getRemote(i)->pushesLastPeriod; + priosCreatedThisPeriod += data.getRemote(i)->stats.priosLastPeriod; + allPmodDeqCounts += data.getRemote(i)->stats.pmodAllDeq; - if ((double)numPushesThisStep && - ((double)numPushesThisStep / - ((double)((maxOfMax >> delta) - (minOfMin >> delta)))) < - chunk_size / 2) { - double xx = ((double)(chunk_size) / - ((double)numPushesThisStep / - ((double)((maxOfMax >> delta) - (minOfMin >> delta))))); - delta += std::floor(std::log2(xx)); - counter *= 2; + data.getRemote(i)->cleanup(); + data.getRemote(i)->lock.unlock(); } - for (unsigned i = 1; i < runtime::activeThreads; ++i) { - data.getRemote(i)->lock.unlock(); + if ((double)numPushesThisStep) { + Index prioRange = (maxOfMax >> delta) - (minOfMin >> delta); + // Division is expensive + // double fillRatio = ((double)numPushesThisStep / (double)prioRange); + if (numPushesThisStep < (chunk_size >> 1) * prioRange) { + // Ditto + // double xx = ((double)(chunk_size) / fillRatio); + double xx = std::log2(chunk_size) - std::log2(numPushesThisStep) + + std::log2(prioRange); + assert(xx); + delta += std::floor(xx); + counter <<= 1; + } } } -#ifdef UNMERGE_ENABLED // serif added here // make sure delta is bigger than 0 so that we can actually unmerge things // give it some time and check the same queue pops - else if (delta > 0 && myID == 0 && p.counters.popsLastFix > counter && - p.counters.popsFromSameQ > 4 * chunk_size) { + else if (EnableUmerge && delta > 0 && myID == 0 && + p.popsLastFix > counter && p.popsFromSameQ > (chunk_size << 2)) { if (((p.maxPrio >> delta) - (p.minPrio >> delta)) < 16 && - ((double)p.counters.pushesLastPeriod / + ((double)p.pushesLastPeriod / ((double)((p.maxPrio >> delta) - (p.minPrio >> delta)))) > 4 * chunk_size) { // this is a check to make sure we are also // pushing with the same frequency end of @@ -359,39 +408,24 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { else delta = 0; + p.cleanup(); for (unsigned i = 1; i < runtime::activeThreads; ++i) { while (!data.getRemote(i)->lock.try_lock()) ; - } - - for (unsigned i = 0; i < runtime::activeThreads; ++i) { - - data.getRemote(i)->counters.popsLastFix = 0; - data.getRemote(i)->counters.slowPopsLastPeriod = 0; - data.getRemote(i)->counters.pushesLastPeriod = 0; - data.getRemote(i)->counters.priosLastPeriod = 0; - data.getRemote(i)->maxPrioDiffLastPeriod = 0; - - data.getRemote(i)->minPrio = std::numeric_limits::max(); - data.getRemote(i)->maxPrio = std::numeric_limits::min(); - } - - for (unsigned i = 1; i < runtime::activeThreads; ++i) { + data.getRemote(i)->cleanup(); data.getRemote(i)->lock.unlock(); } - p.ctr++; + p.stats.numUmerges++; } - p.counters.popsFromSameQ = 0; + p.popsFromSameQ = 0; } -#endif - p.counters.popsFromSameQ = 0; + // p.popsFromSameQ = 0; - updateLocal(p); bool localLeader = substrate::ThreadPool::isLeader(); + deltaIndex msS(this->earliest, 0); + + updateLocal(p); - deltaIndex msS; - msS.k = std::numeric_limits::min(); - msS.d = 0; if (BSP && !UseMonotonic) { msS = p.scanStart; if (localLeader || uniformBSP) { @@ -415,6 +449,7 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { return item; } } + p.lock.unlock(); return galois::optional(); } @@ -423,23 +458,23 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { CTy* slowUpdateLocalOrCreate(ThreadData& p, deltaIndex i) { // update local until we find it or we get the write lock do { - CTy* lC; updateLocal(p); + CTy* lC; if ((lC = p.local[i])) return lC; } while (!masterLock.try_lock()); // we have the write lock, update again then create updateLocal(p); - CTy*& lC2 = p.local[i]; - if (!lC2) { - lC2 = new (heap.allocate(sizeof(CTy))) CTy(); + CTy*& C2 = p.local[i]; + if (!C2) { + C2 = new (heap.allocate(sizeof(CTy))) CTy(); p.lastMasterVersion = masterVersion.load(std::memory_order_relaxed) + 1; - masterLog.push_back(std::make_pair(i, lC2)); + masterLog.push_back(std::make_pair(i, C2)); masterVersion.fetch_add(1); - p.counters.priosLastPeriod++; + p.stats.priosLastPeriod++; } masterLock.unlock(); - return lC2; + return C2; } inline CTy* updateLocalOrCreate(ThreadData& p, deltaIndex i) { @@ -454,7 +489,7 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { public: AdaptiveOrderedByIntegerMetric(const Indexer& x = Indexer()) - : data(earliest), heap(sizeof(CTy)), masterVersion(0), indexer(x) { + : data(this->earliest), heap(sizeof(CTy)), masterVersion(0), indexer(x) { delta = 0; counter = chunk_size; } @@ -475,19 +510,18 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { void push(const value_type& val) { deltaIndex index; ThreadData& p = *data.getLocal(); - while (!p.lock.try_lock()) ; - Index ind = indexer(val); - index.k = ind; - index.d = delta; + + p.pushesLastPeriod++; + index.k = indexer(val); + index.d = delta; if (index.k > p.maxPrio) { p.maxPrio = index.k; } if (index.k < p.minPrio) { p.minPrio = index.k; } - p.counters.pushesLastPeriod++; // Fast path if (index == p.curIndex && p.current) { @@ -503,12 +537,13 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { // Opportunistically move to higher priority work if (index < p.curIndex) { // we moved to a higher prio - p.counters.popsFromSameQ = 0; + p.popsFromSameQ = 0; p.curIndex = index; p.current = C; } C->push(val); + p.lock.unlock(); } @@ -532,16 +567,16 @@ struct AdaptiveOrderedByIntegerMetric : private boost::noncopyable { ; CTy* C = p.current; - p.counters.popsLastFix++; - unsigned myID = galois::substrate::ThreadPool::getTID(); - data.getRemote(myID)->pmodAllDeq++; + p.popsLastFix++; + p.stats.pmodAllDeq++; - if (BlockPeriod && ((p.numPops++ & ((1ull << BlockPeriod) - 1)) == 0)) { + if (BlockPeriod && ((p.numPops++ & ((1ull << BlockPeriod) - 1)) == 0)) return slowPop(p); - } + galois::optional item; if (C && (item = C->pop())) { - p.counters.popsFromSameQ++; + p.popsFromSameQ++; + p.lock.unlock(); return item; }