diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cc22478ac7..bf797e36ec 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,13 +23,12 @@ jobs: optimization: "debug" assert: "debug" coverage: "nocov" - detectcpuflags: "ignore" boost: "--build-boost" icu: "" secp256k1: "--build-secp256k1" cc: "clang-15" flags: "-Og -fPIE" - options: "--enable-isystem --enable-avx2 --enable-sse41" + options: "--enable-isystem --enable-avx2 --enable-sse4" packager: "apt" packages: "" @@ -39,7 +38,6 @@ jobs: optimization: "size" assert: "ndebug" coverage: "nocov" - detectcpuflags: "ignore" boost: "--build-boost" icu: "--build-icu --with-icu" secp256k1: "--build-secp256k1" @@ -55,13 +53,12 @@ jobs: optimization: "size" assert: "ndebug" coverage: "nocov" - detectcpuflags: "ignore" boost: "--build-boost" icu: "" secp256k1: "--build-secp256k1" cc: "gcc-11" flags: "-Os -fPIE" - options: "--enable-isystem --enable-sse41" + options: "--enable-isystem --enable-sse4" packager: "apt" packages: "" @@ -71,7 +68,6 @@ jobs: optimization: "size" assert: "ndebug" coverage: "cov" - detectcpuflags: "detect" boost: "--build-boost" icu: "--build-icu --with-icu" secp256k1: "--build-secp256k1" @@ -87,7 +83,6 @@ jobs: optimization: "size" assert: "ndebug" coverage: "nocov" - detectcpuflags: "ignore" boost: "--build-boost" icu: "" secp256k1: "--build-secp256k1" @@ -103,7 +98,6 @@ jobs: optimization: "size" assert: "ndebug" coverage: "nocov" - detectcpuflags: "ignore" boost: "--build-boost" icu: "" secp256k1: "--build-secp256k1" @@ -142,29 +136,6 @@ jobs: run: | brew install autoconf automake libtool ${{ matrix.packages }} - - name: Determine CPU flags - shell: bash - run: | - if [[ -n $(cat /proc/cpuinfo | grep flags | grep " sse4_1 ") ]]; then - echo "CPU_SUPPORT_SSE41=--enable-sse41" >> $GITHUB_ENV - fi - - if [[ -n $(cat /proc/cpuinfo | grep flags | grep " avx " | grep " avx2 ") ]]; then - echo "CPU_SUPPORT_AVX2=--enable-avx2" >> $GITHUB_ENV - fi - - if [[ -n $(cat /proc/cpuinfo | grep flags | grep " avx512bw ") ]]; then - echo "CPU_SUPPORT_AVX512=--enable-avx512" >> $GITHUB_ENV - fi - - if [[ -n $(cat /proc/cpuinfo | grep flags | grep " sha_ni ") ]]; then - echo "CPU_SUPPORT_SHANI=--enable-shani" >> $GITHUB_ENV - fi - - if [[ ${{ matrix.detectcpuflags }} == 'detect' ]]; then - echo "CPU_SUPPORTED_FLAGS='$CPU_SUPPORT_SSE41 $CPU_SUPPORT_AVX2 $CPU_SUPPORT_AVX512 $CPU_SUPPORT_SHANI'" >> $GITHUB_ENV - fi - - name: Denormalize parameterization shell: bash run: | @@ -203,7 +174,6 @@ jobs: --prefix=${{ env.LIBBITCOIN_SRC_PATH }}prefix ${{ env.LINKAGE }} ${{ env.ASSERT_NDEBUG }} - ${{ env.CPU_SUPPORTED_FLAGS }} ${{ matrix.boost }} ${{ matrix.icu }} ${{ matrix.secp256k1 }} @@ -285,13 +255,12 @@ jobs: optimization: "debug" assert: "debug" coverage: "nocov" - detectcpuflags: "ignore" boost: "--build-boost" icu: "" secp256k1: "--build-secp256k1" cc: "clang-15" flags: "-Og -fPIE" - options: "-Denable-avx2=on -Denable-sse41=on" + options: "-Denable-avx2=on -Denable-sse4=on" packager: "apt" packages: "" @@ -301,7 +270,6 @@ jobs: optimization: "size" assert: "ndebug" coverage: "nocov" - detectcpuflags: "ignore" boost: "--build-boost" icu: "--build-icu --with-icu" secp256k1: "--build-secp256k1" @@ -317,13 +285,12 @@ jobs: optimization: "size" assert: "ndebug" coverage: "nocov" - detectcpuflags: "ignore" boost: "--build-boost" icu: "" secp256k1: "--build-secp256k1" cc: "gcc-11" flags: "-Os -fPIE" - options: "-Denable-sse41=on" + options: "-Denable-sse4=on" packager: "apt" packages: "" @@ -333,7 +300,6 @@ jobs: optimization: "size" assert: "ndebug" coverage: "nocov" - detectcpuflags: "ignore" boost: "--build-boost" icu: "--build-icu --with-icu" secp256k1: "--build-secp256k1" @@ -349,7 +315,6 @@ jobs: optimization: "size" assert: "ndebug" coverage: "nocov" - detectcpuflags: "ignore" boost: "--build-boost" icu: "" secp256k1: "--build-secp256k1" @@ -365,7 +330,6 @@ jobs: optimization: "size" assert: "ndebug" coverage: "nocov" - detectcpuflags: "ignore" boost: "--build-boost" icu: "" secp256k1: "--build-secp256k1" @@ -404,29 +368,6 @@ jobs: run: | brew install autoconf automake libtool ${{ matrix.packages }} - - name: Determine CPU flags - shell: bash - run: | - if [[ -n $(cat /proc/cpuinfo | grep flags | grep " sse4_1 ") ]]; then - echo "CPU_SUPPORT_SSE41=-Denable-sse41=on" >> $GITHUB_ENV - fi - - if [[ -n $(cat /proc/cpuinfo | grep flags | grep " avx " | grep " avx2 ") ]]; then - echo "CPU_SUPPORT_AVX2=-Denable-avx2=on" >> $GITHUB_ENV - fi - - if [[ -n $(cat /proc/cpuinfo | grep flags | grep " avx512bw ") ]]; then - echo "CPU_SUPPORT_AVX512=-Denable-avx512=on" >> $GITHUB_ENV - fi - - if [[ -n $(cat /proc/cpuinfo | grep flags | grep " sha_ni ") ]]; then - echo "CPU_SUPPORT_SHANI=-Denable-shani=on" >> $GITHUB_ENV - fi - - if [[ ${{ matrix.detectcpuflags }} == 'detect' ]]; then - echo "CPU_SUPPORTED_FLAGS='$CPU_SUPPORT_SSE41 $CPU_SUPPORT_AVX2 $CPU_SUPPORT_AVX512 $CPU_SUPPORT_SHANI'" >> $GITHUB_ENV - fi - - name: Denormalize parameterization shell: bash run: | @@ -468,7 +409,6 @@ jobs: --prefix=${{ env.LIBBITCOIN_SRC_PATH }}prefix ${{ env.LINKAGE }} ${{ env.ASSERT_NDEBUG }} - ${{ env.CPU_SUPPORTED_FLAGS }} ${{ matrix.boost }} ${{ matrix.icu }} ${{ matrix.secp256k1 }} @@ -561,13 +501,12 @@ jobs: optimization: "debug" assert: "debug" coverage: "nocov" - detectcpuflags: "ignore" boost: "--build-boost" icu: "" secp256k1: "--build-secp256k1" cc: "clang-15" flags: "-Og -fPIE" - options: "-Denable-avx2=on -Denable-sse41=on" + options: "-Denable-avx2=on -Denable-sse4=on" packager: "apt" packages: "" @@ -578,7 +517,6 @@ jobs: optimization: "size" assert: "ndebug" coverage: "nocov" - detectcpuflags: "ignore" boost: "--build-boost" icu: "--build-icu --with-icu" secp256k1: "--build-secp256k1" @@ -595,13 +533,12 @@ jobs: optimization: "size" assert: "ndebug" coverage: "nocov" - detectcpuflags: "ignore" boost: "--build-boost" icu: "" secp256k1: "--build-secp256k1" cc: "gcc-11" flags: "-Os -fPIE" - options: "-Denable-sse41=on" + options: "-Denable-sse4=on" packager: "apt" packages: "" @@ -634,29 +571,6 @@ jobs: run: | brew install autoconf automake libtool ${{ matrix.packages }} - - name: Determine CPU flags - shell: bash - run: | - if [[ -n $(cat /proc/cpuinfo | grep flags | grep " sse4_1 ") ]]; then - echo "CPU_SUPPORT_SSE41=-Denable-sse41=on" >> $GITHUB_ENV - fi - - if [[ -n $(cat /proc/cpuinfo | grep flags | grep " avx " | grep " avx2 ") ]]; then - echo "CPU_SUPPORT_AVX2=-Denable-avx2=on" >> $GITHUB_ENV - fi - - if [[ -n $(cat /proc/cpuinfo | grep flags | grep " avx512bw ") ]]; then - echo "CPU_SUPPORT_AVX512=-Denable-avx512=on" >> $GITHUB_ENV - fi - - if [[ -n $(cat /proc/cpuinfo | grep flags | grep " sha_ni ") ]]; then - echo "CPU_SUPPORT_SHANI=-Denable-shani=on" >> $GITHUB_ENV - fi - - if [[ ${{ matrix.detectcpuflags }} == 'detect' ]]; then - echo "CPU_SUPPORTED_FLAGS='$CPU_SUPPORT_SSE41 $CPU_SUPPORT_AVX2 $CPU_SUPPORT_AVX512 $CPU_SUPPORT_SHANI'" >> $GITHUB_ENV - fi - - name: Denormalize parameterization shell: bash run: | @@ -699,7 +613,6 @@ jobs: --preset=${{ matrix.preset }} ${{ env.LINKAGE }} ${{ env.ASSERT_NDEBUG }} - ${{ env.CPU_SUPPORTED_FLAGS }} ${{ matrix.boost }} ${{ matrix.icu }} ${{ matrix.secp256k1 }} diff --git a/Makefile.am b/Makefile.am index 9a6fa4519e..bdcc4e8dc8 100755 --- a/Makefile.am +++ b/Makefile.am @@ -425,6 +425,7 @@ include_bitcoin_system_HEADERS = \ include/bitcoin/system/constraints.hpp \ include/bitcoin/system/define.hpp \ include/bitcoin/system/exceptions.hpp \ + include/bitcoin/system/execution.hpp \ include/bitcoin/system/forks.hpp \ include/bitcoin/system/funclets.hpp \ include/bitcoin/system/have.hpp \ diff --git a/builds/msvc/vs2022/libbitcoin-system/libbitcoin-system.vcxproj b/builds/msvc/vs2022/libbitcoin-system/libbitcoin-system.vcxproj index 13504ad87a..b3f7b885e5 100644 --- a/builds/msvc/vs2022/libbitcoin-system/libbitcoin-system.vcxproj +++ b/builds/msvc/vs2022/libbitcoin-system/libbitcoin-system.vcxproj @@ -331,6 +331,7 @@ + diff --git a/builds/msvc/vs2022/libbitcoin-system/libbitcoin-system.vcxproj.filters b/builds/msvc/vs2022/libbitcoin-system/libbitcoin-system.vcxproj.filters index c4080e871a..837e842b9c 100644 --- a/builds/msvc/vs2022/libbitcoin-system/libbitcoin-system.vcxproj.filters +++ b/builds/msvc/vs2022/libbitcoin-system/libbitcoin-system.vcxproj.filters @@ -869,6 +869,9 @@ include\bitcoin\system + + include\bitcoin\system + include\bitcoin\system diff --git a/include/bitcoin/system.hpp b/include/bitcoin/system.hpp index 5d20e1783e..a269944bed 100755 --- a/include/bitcoin/system.hpp +++ b/include/bitcoin/system.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include diff --git a/include/bitcoin/system/execution.hpp b/include/bitcoin/system/execution.hpp new file mode 100644 index 0000000000..c3d3c5c3e6 --- /dev/null +++ b/include/bitcoin/system/execution.hpp @@ -0,0 +1,2819 @@ +/** + * Thread pool-based implementation of parallel standard library algorithms. + * github.com/alugowski/poolSTL + * + * MIT License: + * + * Copyright (c) 2023 Adam Lugowski + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + + +#ifndef POOLSTL_HPP +#define POOLSTL_HPP + + +#ifndef POOLSTL_EXECUTION_HPP +#define POOLSTL_EXECUTION_HPP + +#include +#include +#include +#include + + +#ifndef AL_TASK_THREAD_POOL_HPP +#define AL_TASK_THREAD_POOL_HPP + +// Version macros. +#define TASK_THREAD_POOL_VERSION_MAJOR 1 +#define TASK_THREAD_POOL_VERSION_MINOR 0 +#define TASK_THREAD_POOL_VERSION_PATCH 10 + +#include +#include +#include +#include +#include +#include +#include + +// MSVC does not correctly set the __cplusplus macro by default, so we must read it from _MSVC_LANG +// See https://devblogs.microsoft.com/cppblog/msvc-now-correctly-reports-__cplusplus/ +#if __cplusplus >= 201703L || (defined(_MSVC_LANG) && _MSVC_LANG >= 201703L) +#define TTP_CXX17 1 +#else +#define TTP_CXX17 0 +#endif + +#if TTP_CXX17 +#define TTP_NODISCARD [[nodiscard]] +#else +#define TTP_NODISCARD +#endif + +namespace task_thread_pool { + +#if !TTP_CXX17 + /** + * A reimplementation of std::decay_t, which is only available since C++14. + */ + template + using decay_t = typename std::decay::type; +#endif + + /** + * A fast and lightweight thread pool that uses C++11 threads. + */ + class task_thread_pool { + public: + /** + * Create a task_thread_pool and start worker threads. + * + * @param num_threads Number of worker threads. If 0 then number of threads is equal to the + * number of physical cores on the machine, as given by std::thread::hardware_concurrency(). + */ + explicit task_thread_pool(unsigned int num_threads = 0) { + if (num_threads < 1) { + num_threads = std::thread::hardware_concurrency(); + if (num_threads < 1) { num_threads = 1; } + } + start_threads(num_threads); + } + + /** + * Finish all tasks left in the queue then shut down worker threads. + * If the pool is currently paused then it is resumed. + */ + ~task_thread_pool() { + unpause(); + wait_for_queued_tasks(); + stop_all_threads(); + } + + /** + * Drop all tasks that have been submitted but not yet started by a worker. + * + * Tasks already in progress continue executing. + */ + void clear_task_queue() { + const std::lock_guard tasks_lock(task_mutex); + tasks = {}; + } + + /** + * Get number of enqueued tasks. + * + * @return Number of tasks that have been enqueued but not yet started. + */ + TTP_NODISCARD size_t get_num_queued_tasks() const { + const std::lock_guard tasks_lock(task_mutex); + return tasks.size(); + } + + /** + * Get number of in-progress tasks. + * + * @return Approximate number of tasks currently being processed by worker threads. + */ + TTP_NODISCARD size_t get_num_running_tasks() const { + const std::lock_guard tasks_lock(task_mutex); + return num_inflight_tasks; + } + + /** + * Get total number of tasks in the pool. + * + * @return Approximate number of tasks both enqueued and running. + */ + TTP_NODISCARD size_t get_num_tasks() const { + const std::lock_guard tasks_lock(task_mutex); + return tasks.size() + num_inflight_tasks; + } + + /** + * Get number of worker threads. + * + * @return Number of worker threads. + */ + TTP_NODISCARD unsigned int get_num_threads() const { + const std::lock_guard threads_lock(thread_mutex); + return static_cast(threads.size()); + } + + /** + * Set number of worker threads. Will start or stop worker threads as necessary. + * + * @param num_threads Number of worker threads. If 0 then number of threads is equal to the + * number of physical cores on the machine, as given by std::thread::hardware_concurrency(). + * @return Previous number of worker threads. + */ + unsigned int set_num_threads(unsigned int num_threads) { + const std::lock_guard threads_lock(thread_mutex); + unsigned int previous_num_threads = get_num_threads(); + + if (num_threads < 1) { + num_threads = std::thread::hardware_concurrency(); + if (num_threads < 1) { num_threads = 1; } + } + + if (previous_num_threads <= num_threads) { + // expanding the thread pool + start_threads(num_threads - previous_num_threads); + } else { + // contracting the thread pool + stop_all_threads(); + { + const std::lock_guard tasks_lock(task_mutex); + pool_running = true; + } + start_threads(num_threads); + } + + return previous_num_threads; + } + + /** + * Stop executing queued tasks. Use `unpause()` to resume. Note: Destroying the pool will implicitly unpause. + * + * Any in-progress tasks continue executing. + */ + void pause() { + const std::lock_guard tasks_lock(task_mutex); + pool_paused = true; + } + + /** + * Resume executing queued tasks. + */ + void unpause() { + const std::lock_guard tasks_lock(task_mutex); + pool_paused = false; + task_cv.notify_all(); + } + + /** + * Check whether the pool is paused. + * + * @return true if pause() has been called without an intervening unpause(). + */ + TTP_NODISCARD bool is_paused() const { + const std::lock_guard tasks_lock(task_mutex); + return pool_paused; + } + + /** + * Submit a Callable for the pool to execute and return a std::future. + * + * @param func The Callable to execute. Can be a function, a lambda, std::packaged_task, std::function, etc. + * @param args Arguments for func. Optional. + * @return std::future that can be used to get func's return value or thrown exception. + */ + template , std::decay_t...> +#else + typename R = typename std::result_of(decay_t...)>::type +#endif + > + TTP_NODISCARD std::future submit(F&& func, A&&... args) { +#if defined(_MSC_VER) + // MSVC's packaged_task is not movable even though it should be. + // Discussion about this bug and its future fix: + // https://developercommunity.visualstudio.com/t/unable-to-move-stdpackaged-task-into-any-stl-conta/108672 + std::shared_ptr> ptask = + std::make_shared>(std::bind(std::forward(func), std::forward(args)...)); + submit_detach([ptask] { (*ptask)(); }); + return ptask->get_future(); +#else + std::packaged_task task(std::bind(std::forward(func), std::forward(args)...)); + auto ret = task.get_future(); + submit_detach(std::move(task)); + return ret; +#endif + } + + /** + * Submit a zero-argument Callable for the pool to execute. + * + * @param func The Callable to execute. Can be a function, a lambda, std::packaged_task, std::function, etc. + */ + template + void submit_detach(F&& func) { + const std::lock_guard tasks_lock(task_mutex); + tasks.emplace(std::forward(func)); + task_cv.notify_one(); + } + + /** + * Submit a Callable with arguments for the pool to execute. + * + * @param func The Callable to execute. Can be a function, a lambda, std::packaged_task, std::function, etc. + */ + template + void submit_detach(F&& func, A&&... args) { + const std::lock_guard tasks_lock(task_mutex); + tasks.emplace(std::bind(std::forward(func), std::forward(args)...)); + task_cv.notify_one(); + } + + /** + * Block until the task queue is empty. Some tasks may be in-progress when this method returns. + */ + void wait_for_queued_tasks() { + std::unique_lock tasks_lock(task_mutex); + notify_task_finish = true; + task_finished_cv.wait(tasks_lock, [&] { return tasks.empty(); }); + notify_task_finish = false; + } + + /** + * Block until all tasks have finished. + */ + void wait_for_tasks() { + std::unique_lock tasks_lock(task_mutex); + notify_task_finish = true; + task_finished_cv.wait(tasks_lock, [&] { return tasks.empty() && num_inflight_tasks == 0; }); + notify_task_finish = false; + } + + protected: + + /** + * Main function for worker threads. + */ + void worker_main() { + bool finished_task = false; + + while (true) { + std::unique_lock tasks_lock(task_mutex); + + if (finished_task) { + --num_inflight_tasks; + if (notify_task_finish) { + task_finished_cv.notify_all(); + } + } + + task_cv.wait(tasks_lock, [&]() { return !pool_running || (!pool_paused && !tasks.empty()); }); + + if (!pool_running) { + break; + } + + // Must mean that (!pool_paused && !tasks.empty()) is true + + std::packaged_task task{std::move(tasks.front())}; + tasks.pop(); + ++num_inflight_tasks; + tasks_lock.unlock(); + + try { + task(); + } catch (...) { + // std::packaged_task::operator() may throw in some error conditions, such as if the task + // had already been run. Nothing that the pool can do anything about. + } + + finished_task = true; + } + } + + /** + * Start worker threads. + * + * @param num_threads How many threads to start. + */ + void start_threads(const unsigned int num_threads) { + const std::lock_guard threads_lock(thread_mutex); + + for (unsigned int i = 0; i < num_threads; ++i) { + threads.emplace_back(&task_thread_pool::worker_main, this); + } + } + + /** + * Stop, join, and destroy all worker threads. + */ + void stop_all_threads() { + const std::lock_guard threads_lock(thread_mutex); + + { + const std::lock_guard tasks_lock(task_mutex); + pool_running = false; + task_cv.notify_all(); + } + + for (auto& thread : threads) { + if (thread.joinable()) { + thread.join(); + } + } + threads.clear(); + } + + /** + * The worker threads. + * + * Access protected by thread_mutex + */ + std::vector threads; + + /** + * A mutex for methods that start/stop threads. + */ + mutable std::recursive_mutex thread_mutex; + + /** + * The task queue. + * + * Access protected by task_mutex. + */ + std::queue> tasks = {}; + + /** + * A mutex for all variables related to tasks. + */ + mutable std::mutex task_mutex; + + /** + * Used to notify changes to the task queue, such as a new task added, pause/unpause, etc. + */ + std::condition_variable task_cv; + + /** + * Used to notify of finished tasks. + */ + std::condition_variable task_finished_cv; + + /** + * A signal for worker threads that the pool is either running or shutting down. + * + * Access protected by task_mutex. + */ + bool pool_running = true; + + /** + * A signal for worker threads to not pull new tasks from the queue. + * + * Access protected by task_mutex. + */ + bool pool_paused = false; + + /** + * A signal for worker threads that they should notify task_finished_cv when they finish a task. + * + * Access protected by task_mutex. + */ + bool notify_task_finish = false; + + /** + * A counter of the number of tasks in-progress by worker threads. + * Incremented when a task is popped off the task queue and decremented when that task is complete. + * + * Access protected by task_mutex. + */ + int num_inflight_tasks = 0; + }; +} + +// clean up +#undef TTP_NODISCARD +#undef TTP_CXX17 + +#endif + +#ifndef POOLSTL_INTERNAL_UTILS_HPP +#define POOLSTL_INTERNAL_UTILS_HPP + +// Version macros. +#define POOLSTL_VERSION_MAJOR 0 +#define POOLSTL_VERSION_MINOR 3 +#define POOLSTL_VERSION_PATCH 5 + +#include +#include + +#if __cplusplus >= 201703L || (defined(_MSVC_LANG) && _MSVC_LANG >= 201703L) +#define POOLSTL_HAVE_CXX17 1 +#define POOLSTL_NO_DISCARD [[nodiscard]] +#else +#define POOLSTL_HAVE_CXX17 0 +#define POOLSTL_NO_DISCARD +#endif + +#if POOLSTL_HAVE_CXX17 && (!defined(_GLIBCXX_RELEASE) || _GLIBCXX_RELEASE >= 9) +#define POOLSTL_HAVE_CXX17_LIB 1 +#else +#define POOLSTL_HAVE_CXX17_LIB 0 +#endif + +#if __cplusplus >= 201402L || (defined(_MSVC_LANG) && _MSVC_LANG >= 201402L) +#define POOLSTL_HAVE_CXX14 1 +#else +#define POOLSTL_HAVE_CXX14 0 +#endif + +namespace poolstl { + namespace internal { + + inline constexpr std::size_t get_chunk_size(std::size_t num_steps, unsigned int num_threads) { + return (num_steps / num_threads) + ((num_steps % num_threads) > 0 ? 1 : 0); + } + + template + constexpr typename std::iterator_traits::difference_type + get_chunk_size(Iterator first, Iterator last, unsigned int num_threads) { + using diff_t = typename std::iterator_traits::difference_type; + return static_cast(get_chunk_size((std::size_t)std::distance(first, last), num_threads)); + } + + template + constexpr typename std::iterator_traits::difference_type + get_iter_chunk_size(const Iterator& iter, const Iterator& last, + typename std::iterator_traits::difference_type chunk_size) { + return std::min(chunk_size, std::distance(iter, last)); + } + + template + Iterator advanced(Iterator iter, typename std::iterator_traits::difference_type offset) { + Iterator ret = iter; + std::advance(ret, offset); + return ret; + } + + /** + * An iterator wrapper that calls std::future<>::get(). + * @tparam Iterator + */ + template + class getting_iter : public Iterator { + public: + using value_type = decltype((*std::declval()).get()); + using difference_type = typename std::iterator_traits::difference_type; + using pointer = value_type*; + using reference = value_type&; + explicit getting_iter(Iterator iter) : iter(iter) {} + + getting_iter operator++() { ++iter; return *this; } + getting_iter operator++(int) { getting_iter ret(*this); ++iter; return ret; } + + value_type operator*() { return (*iter).get(); } + value_type operator[](difference_type offset) { return iter[offset].get(); } + + bool operator==(const getting_iter &other) const { return iter == other.iter; } + bool operator!=(const getting_iter &other) const { return iter != other.iter; } + + protected: + Iterator iter; + }; + + template + getting_iter get_wrap(Iterator iter) { + return getting_iter(iter); + } + + template + void get_futures(Container& futures) { + for (auto &future: futures) { + future.get(); + } + } + + /** + * Identify a pivot element for quicksort. Chooses the middle element of the range. + */ + template + typename std::iterator_traits::value_type quicksort_pivot(Iterator first, Iterator last) { + return *(std::next(first, std::distance(first, last) / 2)); + } + + /** + * Predicate for std::partition (for quicksort) + */ + template + struct pivot_predicate { + pivot_predicate(Compare comp, const T& pivot) : comp(comp), pivot(pivot) {} + + bool operator()(const T& em) { + return comp(em, pivot); + } + Compare comp; + const T pivot; + }; + + /* + * Some methods are only available with C++17 and up. Reimplement on older standards. + */ +#if POOLSTL_HAVE_CXX17_LIB + namespace cpp17 = std; +#else + namespace cpp17 { + + // std::reduce + + template + Tp reduce(InputIt first, InputIt last, Tp init, BinOp b) { + for (; first != last; ++first) + init = b(init, *first); + return init; + } + + template + typename std::iterator_traits::value_type reduce(InputIt first, InputIt last) { + return reduce(first, last, + typename std::iterator_traits::value_type{}, + std::plus::value_type>()); + } + + // std::transform + + template + OutputIt transform(InputIt first1, InputIt last1, OutputIt d_first, + UnaryOperation unary_op) { + while (first1 != last1) { + *d_first++ = unary_op(*first1++); + } + + return d_first; + } + + template + OutputIt transform(InputIt1 first1, InputIt1 last1, + InputIt2 first2, OutputIt d_first, + BinaryOperation binary_op) { + while (first1 != last1) { + *d_first++ = binary_op(*first1++, *first2++); + } + + return d_first; + } + } +#endif + } +} + +#endif + +namespace poolstl { + + namespace ttp = task_thread_pool; + + namespace execution { + namespace internal { + /** + * Holds the thread pool used by par. + */ + inline std::shared_ptr get_default_pool() { + static std::shared_ptr pool; + static std::once_flag flag; + std::call_once(flag, [&](){ pool = std::make_shared(); }); + return pool; + } + } + + /** + * Base class for all poolSTL policies. + */ + struct poolstl_policy { + }; + + /** + * A sequential policy that simply forwards to the non-policy overload. + */ + struct sequenced_policy : public poolstl_policy { + POOLSTL_NO_DISCARD ttp::task_thread_pool* pool() const { + // never called, but must exist for C++11 support + throw std::runtime_error("poolSTL: requested thread pool for seq policy."); + } + + POOLSTL_NO_DISCARD bool par_allowed() const { + return false; + } + }; + + /** + * A parallel policy that can use a user-specified thread pool or a default one. + */ + struct parallel_policy : public poolstl_policy { + parallel_policy() = default; + explicit parallel_policy(ttp::task_thread_pool* on_pool, bool par_ok): on_pool(on_pool), par_ok(par_ok) {} + + parallel_policy on(ttp::task_thread_pool& pool) const { + return parallel_policy{&pool, par_ok}; + } + + parallel_policy par_if(bool call_par) const { + return parallel_policy{on_pool, call_par}; + } + + POOLSTL_NO_DISCARD ttp::task_thread_pool* pool() const { + if (on_pool) { + return on_pool; + } else { + return internal::get_default_pool().get(); + } + } + + POOLSTL_NO_DISCARD bool par_allowed() const { + return par_ok; + } + + protected: + ttp::task_thread_pool *on_pool = nullptr; + bool par_ok = true; + }; + + constexpr sequenced_policy seq{}; + constexpr parallel_policy par{}; + + /** + * EXPERIMENTAL: Subject to significant changes or removal. + * Use pure threads for each operation instead of a shared thread pool. + * + * Advantage: + * - Fewer symbols (no packaged_task with its operators, destructors, vtable, etc) means smaller binary + * which can mean a lot when there are many calls. + * - No thread pool to manage. + * + * Disadvantages: + * - Threads are started and joined for every operation, so it is harder to amortize that cost. + * - Barely any algorithms are supported. + */ + struct pure_threads_policy : public poolstl_policy { + explicit pure_threads_policy(unsigned int num_threads, bool par_ok): num_threads(num_threads), + par_ok(par_ok) {} + + POOLSTL_NO_DISCARD unsigned int get_num_threads() const { + if (num_threads == 0) { + return std::thread::hardware_concurrency(); + } + return num_threads; + } + + POOLSTL_NO_DISCARD bool par_allowed() const { + return par_ok; + } + + protected: + unsigned int num_threads = 1; + bool par_ok = true; + }; + + /** + * Choose parallel or sequential at runtime. + * + * @param call_par Whether to use a parallel policy. + * @return `par` if call_par is true, else a sequential policy (like `seq`). + */ + inline parallel_policy par_if(bool call_par) { + return parallel_policy{nullptr, call_par}; + } + + /** + * Choose parallel or sequential at runtime, with pool selection. + * + * @param call_par Whether to use a parallel policy. + * @return `par.on(pool)` if call_par is true, else a sequential policy (like `seq`). + */ + inline parallel_policy par_if(bool call_par, ttp::task_thread_pool& pool) { + return parallel_policy{&pool, call_par}; + } + + /** + * EXPERIMENTAL: Subject to significant changes or removal. See `pure_threads_policy`. + * Choose parallel or sequential at runtime, with thread count selection. + * + * @param call_par Whether to use a parallel policy. + * @return `par.on(pool)` if call_par is true, else `seq`. + */ + inline pure_threads_policy par_if_threads(bool call_par, unsigned int num_threads) { + return pure_threads_policy{num_threads, call_par}; + } + } + + using execution::seq; + using execution::par; + using execution::par_if; + + namespace internal { + /** + * To enable/disable seq overload resolution + */ + template + using enable_if_seq = + typename std::enable_if< + std::is_same::type>::type>::value, + Tp>::type; + + /** + * To enable/disable par overload resolution + */ + template + using enable_if_par = + typename std::enable_if< + std::is_same::type>::type>::value, + Tp>::type; + + /** + * To enable/disable par overload resolution + */ + template + using enable_if_poolstl_policy = + typename std::enable_if< + std::is_base_of::type>::type>::value, + Tp>::type; + + template + bool is_seq(const ExecPolicy& policy) { + return !policy.par_allowed(); + } + + template + using is_pure_threads_policy = std::is_same::type>::type>; + } +} + +#endif + +#ifndef POOLSTL_ALGORITHM_HPP +#define POOLSTL_ALGORITHM_HPP + +#include + + +#ifndef POOLSTL_INTERNAL_TTP_IMPL_HPP +#define POOLSTL_INTERNAL_TTP_IMPL_HPP + +#include +#include +#include +#include + + +#ifndef POOLSTL_EXECUTION_HPP +#define POOLSTL_EXECUTION_HPP + +#include +#include +#include +#include + + +#ifndef AL_TASK_THREAD_POOL_HPP +#define AL_TASK_THREAD_POOL_HPP + +// Version macros. +#define TASK_THREAD_POOL_VERSION_MAJOR 1 +#define TASK_THREAD_POOL_VERSION_MINOR 0 +#define TASK_THREAD_POOL_VERSION_PATCH 10 + +#include +#include +#include +#include +#include +#include +#include + +// MSVC does not correctly set the __cplusplus macro by default, so we must read it from _MSVC_LANG +// See https://devblogs.microsoft.com/cppblog/msvc-now-correctly-reports-__cplusplus/ +#if __cplusplus >= 201703L || (defined(_MSVC_LANG) && _MSVC_LANG >= 201703L) +#define TTP_CXX17 1 +#else +#define TTP_CXX17 0 +#endif + +#if TTP_CXX17 +#define TTP_NODISCARD [[nodiscard]] +#else +#define TTP_NODISCARD +#endif + +namespace task_thread_pool { + +#if !TTP_CXX17 + /** + * A reimplementation of std::decay_t, which is only available since C++14. + */ + template + using decay_t = typename std::decay::type; +#endif + + /** + * A fast and lightweight thread pool that uses C++11 threads. + */ + class task_thread_pool { + public: + /** + * Create a task_thread_pool and start worker threads. + * + * @param num_threads Number of worker threads. If 0 then number of threads is equal to the + * number of physical cores on the machine, as given by std::thread::hardware_concurrency(). + */ + explicit task_thread_pool(unsigned int num_threads = 0) { + if (num_threads < 1) { + num_threads = std::thread::hardware_concurrency(); + if (num_threads < 1) { num_threads = 1; } + } + start_threads(num_threads); + } + + /** + * Finish all tasks left in the queue then shut down worker threads. + * If the pool is currently paused then it is resumed. + */ + ~task_thread_pool() { + unpause(); + wait_for_queued_tasks(); + stop_all_threads(); + } + + /** + * Drop all tasks that have been submitted but not yet started by a worker. + * + * Tasks already in progress continue executing. + */ + void clear_task_queue() { + const std::lock_guard tasks_lock(task_mutex); + tasks = {}; + } + + /** + * Get number of enqueued tasks. + * + * @return Number of tasks that have been enqueued but not yet started. + */ + TTP_NODISCARD size_t get_num_queued_tasks() const { + const std::lock_guard tasks_lock(task_mutex); + return tasks.size(); + } + + /** + * Get number of in-progress tasks. + * + * @return Approximate number of tasks currently being processed by worker threads. + */ + TTP_NODISCARD size_t get_num_running_tasks() const { + const std::lock_guard tasks_lock(task_mutex); + return num_inflight_tasks; + } + + /** + * Get total number of tasks in the pool. + * + * @return Approximate number of tasks both enqueued and running. + */ + TTP_NODISCARD size_t get_num_tasks() const { + const std::lock_guard tasks_lock(task_mutex); + return tasks.size() + num_inflight_tasks; + } + + /** + * Get number of worker threads. + * + * @return Number of worker threads. + */ + TTP_NODISCARD unsigned int get_num_threads() const { + const std::lock_guard threads_lock(thread_mutex); + return static_cast(threads.size()); + } + + /** + * Set number of worker threads. Will start or stop worker threads as necessary. + * + * @param num_threads Number of worker threads. If 0 then number of threads is equal to the + * number of physical cores on the machine, as given by std::thread::hardware_concurrency(). + * @return Previous number of worker threads. + */ + unsigned int set_num_threads(unsigned int num_threads) { + const std::lock_guard threads_lock(thread_mutex); + unsigned int previous_num_threads = get_num_threads(); + + if (num_threads < 1) { + num_threads = std::thread::hardware_concurrency(); + if (num_threads < 1) { num_threads = 1; } + } + + if (previous_num_threads <= num_threads) { + // expanding the thread pool + start_threads(num_threads - previous_num_threads); + } else { + // contracting the thread pool + stop_all_threads(); + { + const std::lock_guard tasks_lock(task_mutex); + pool_running = true; + } + start_threads(num_threads); + } + + return previous_num_threads; + } + + /** + * Stop executing queued tasks. Use `unpause()` to resume. Note: Destroying the pool will implicitly unpause. + * + * Any in-progress tasks continue executing. + */ + void pause() { + const std::lock_guard tasks_lock(task_mutex); + pool_paused = true; + } + + /** + * Resume executing queued tasks. + */ + void unpause() { + const std::lock_guard tasks_lock(task_mutex); + pool_paused = false; + task_cv.notify_all(); + } + + /** + * Check whether the pool is paused. + * + * @return true if pause() has been called without an intervening unpause(). + */ + TTP_NODISCARD bool is_paused() const { + const std::lock_guard tasks_lock(task_mutex); + return pool_paused; + } + + /** + * Submit a Callable for the pool to execute and return a std::future. + * + * @param func The Callable to execute. Can be a function, a lambda, std::packaged_task, std::function, etc. + * @param args Arguments for func. Optional. + * @return std::future that can be used to get func's return value or thrown exception. + */ + template , std::decay_t...> +#else + typename R = typename std::result_of(decay_t...)>::type +#endif + > + TTP_NODISCARD std::future submit(F&& func, A&&... args) { +#if defined(_MSC_VER) + // MSVC's packaged_task is not movable even though it should be. + // Discussion about this bug and its future fix: + // https://developercommunity.visualstudio.com/t/unable-to-move-stdpackaged-task-into-any-stl-conta/108672 + std::shared_ptr> ptask = + std::make_shared>(std::bind(std::forward(func), std::forward(args)...)); + submit_detach([ptask] { (*ptask)(); }); + return ptask->get_future(); +#else + std::packaged_task task(std::bind(std::forward(func), std::forward(args)...)); + auto ret = task.get_future(); + submit_detach(std::move(task)); + return ret; +#endif + } + + /** + * Submit a zero-argument Callable for the pool to execute. + * + * @param func The Callable to execute. Can be a function, a lambda, std::packaged_task, std::function, etc. + */ + template + void submit_detach(F&& func) { + const std::lock_guard tasks_lock(task_mutex); + tasks.emplace(std::forward(func)); + task_cv.notify_one(); + } + + /** + * Submit a Callable with arguments for the pool to execute. + * + * @param func The Callable to execute. Can be a function, a lambda, std::packaged_task, std::function, etc. + */ + template + void submit_detach(F&& func, A&&... args) { + const std::lock_guard tasks_lock(task_mutex); + tasks.emplace(std::bind(std::forward(func), std::forward(args)...)); + task_cv.notify_one(); + } + + /** + * Block until the task queue is empty. Some tasks may be in-progress when this method returns. + */ + void wait_for_queued_tasks() { + std::unique_lock tasks_lock(task_mutex); + notify_task_finish = true; + task_finished_cv.wait(tasks_lock, [&] { return tasks.empty(); }); + notify_task_finish = false; + } + + /** + * Block until all tasks have finished. + */ + void wait_for_tasks() { + std::unique_lock tasks_lock(task_mutex); + notify_task_finish = true; + task_finished_cv.wait(tasks_lock, [&] { return tasks.empty() && num_inflight_tasks == 0; }); + notify_task_finish = false; + } + + protected: + + /** + * Main function for worker threads. + */ + void worker_main() { + bool finished_task = false; + + while (true) { + std::unique_lock tasks_lock(task_mutex); + + if (finished_task) { + --num_inflight_tasks; + if (notify_task_finish) { + task_finished_cv.notify_all(); + } + } + + task_cv.wait(tasks_lock, [&]() { return !pool_running || (!pool_paused && !tasks.empty()); }); + + if (!pool_running) { + break; + } + + // Must mean that (!pool_paused && !tasks.empty()) is true + + std::packaged_task task{std::move(tasks.front())}; + tasks.pop(); + ++num_inflight_tasks; + tasks_lock.unlock(); + + try { + task(); + } catch (...) { + // std::packaged_task::operator() may throw in some error conditions, such as if the task + // had already been run. Nothing that the pool can do anything about. + } + + finished_task = true; + } + } + + /** + * Start worker threads. + * + * @param num_threads How many threads to start. + */ + void start_threads(const unsigned int num_threads) { + const std::lock_guard threads_lock(thread_mutex); + + for (unsigned int i = 0; i < num_threads; ++i) { + threads.emplace_back(&task_thread_pool::worker_main, this); + } + } + + /** + * Stop, join, and destroy all worker threads. + */ + void stop_all_threads() { + const std::lock_guard threads_lock(thread_mutex); + + { + const std::lock_guard tasks_lock(task_mutex); + pool_running = false; + task_cv.notify_all(); + } + + for (auto& thread : threads) { + if (thread.joinable()) { + thread.join(); + } + } + threads.clear(); + } + + /** + * The worker threads. + * + * Access protected by thread_mutex + */ + std::vector threads; + + /** + * A mutex for methods that start/stop threads. + */ + mutable std::recursive_mutex thread_mutex; + + /** + * The task queue. + * + * Access protected by task_mutex. + */ + std::queue> tasks = {}; + + /** + * A mutex for all variables related to tasks. + */ + mutable std::mutex task_mutex; + + /** + * Used to notify changes to the task queue, such as a new task added, pause/unpause, etc. + */ + std::condition_variable task_cv; + + /** + * Used to notify of finished tasks. + */ + std::condition_variable task_finished_cv; + + /** + * A signal for worker threads that the pool is either running or shutting down. + * + * Access protected by task_mutex. + */ + bool pool_running = true; + + /** + * A signal for worker threads to not pull new tasks from the queue. + * + * Access protected by task_mutex. + */ + bool pool_paused = false; + + /** + * A signal for worker threads that they should notify task_finished_cv when they finish a task. + * + * Access protected by task_mutex. + */ + bool notify_task_finish = false; + + /** + * A counter of the number of tasks in-progress by worker threads. + * Incremented when a task is popped off the task queue and decremented when that task is complete. + * + * Access protected by task_mutex. + */ + int num_inflight_tasks = 0; + }; +} + +// clean up +#undef TTP_NODISCARD +#undef TTP_CXX17 + +#endif + +#ifndef POOLSTL_INTERNAL_UTILS_HPP +#define POOLSTL_INTERNAL_UTILS_HPP + +// Version macros. +#define POOLSTL_VERSION_MAJOR 0 +#define POOLSTL_VERSION_MINOR 3 +#define POOLSTL_VERSION_PATCH 5 + +#include +#include + +#if __cplusplus >= 201703L || (defined(_MSVC_LANG) && _MSVC_LANG >= 201703L) +#define POOLSTL_HAVE_CXX17 1 +#define POOLSTL_NO_DISCARD [[nodiscard]] +#else +#define POOLSTL_HAVE_CXX17 0 +#define POOLSTL_NO_DISCARD +#endif + +#if POOLSTL_HAVE_CXX17 && (!defined(_GLIBCXX_RELEASE) || _GLIBCXX_RELEASE >= 9) +#define POOLSTL_HAVE_CXX17_LIB 1 +#else +#define POOLSTL_HAVE_CXX17_LIB 0 +#endif + +#if __cplusplus >= 201402L || (defined(_MSVC_LANG) && _MSVC_LANG >= 201402L) +#define POOLSTL_HAVE_CXX14 1 +#else +#define POOLSTL_HAVE_CXX14 0 +#endif + +namespace poolstl { + namespace internal { + + inline constexpr std::size_t get_chunk_size(std::size_t num_steps, unsigned int num_threads) { + return (num_steps / num_threads) + ((num_steps % num_threads) > 0 ? 1 : 0); + } + + template + constexpr typename std::iterator_traits::difference_type + get_chunk_size(Iterator first, Iterator last, unsigned int num_threads) { + using diff_t = typename std::iterator_traits::difference_type; + return static_cast(get_chunk_size((std::size_t)std::distance(first, last), num_threads)); + } + + template + constexpr typename std::iterator_traits::difference_type + get_iter_chunk_size(const Iterator& iter, const Iterator& last, + typename std::iterator_traits::difference_type chunk_size) { + return std::min(chunk_size, std::distance(iter, last)); + } + + template + Iterator advanced(Iterator iter, typename std::iterator_traits::difference_type offset) { + Iterator ret = iter; + std::advance(ret, offset); + return ret; + } + + /** + * An iterator wrapper that calls std::future<>::get(). + * @tparam Iterator + */ + template + class getting_iter : public Iterator { + public: + using value_type = decltype((*std::declval()).get()); + using difference_type = typename std::iterator_traits::difference_type; + using pointer = value_type*; + using reference = value_type&; + explicit getting_iter(Iterator iter) : iter(iter) {} + + getting_iter operator++() { ++iter; return *this; } + getting_iter operator++(int) { getting_iter ret(*this); ++iter; return ret; } + + value_type operator*() { return (*iter).get(); } + value_type operator[](difference_type offset) { return iter[offset].get(); } + + bool operator==(const getting_iter &other) const { return iter == other.iter; } + bool operator!=(const getting_iter &other) const { return iter != other.iter; } + + protected: + Iterator iter; + }; + + template + getting_iter get_wrap(Iterator iter) { + return getting_iter(iter); + } + + template + void get_futures(Container& futures) { + for (auto &future: futures) { + future.get(); + } + } + + /** + * Identify a pivot element for quicksort. Chooses the middle element of the range. + */ + template + typename std::iterator_traits::value_type quicksort_pivot(Iterator first, Iterator last) { + return *(std::next(first, std::distance(first, last) / 2)); + } + + /** + * Predicate for std::partition (for quicksort) + */ + template + struct pivot_predicate { + pivot_predicate(Compare comp, const T& pivot) : comp(comp), pivot(pivot) {} + + bool operator()(const T& em) { + return comp(em, pivot); + } + Compare comp; + const T pivot; + }; + + /* + * Some methods are only available with C++17 and up. Reimplement on older standards. + */ +#if POOLSTL_HAVE_CXX17_LIB + namespace cpp17 = std; +#else + namespace cpp17 { + + // std::reduce + + template + Tp reduce(InputIt first, InputIt last, Tp init, BinOp b) { + for (; first != last; ++first) + init = b(init, *first); + return init; + } + + template + typename std::iterator_traits::value_type reduce(InputIt first, InputIt last) { + return reduce(first, last, + typename std::iterator_traits::value_type{}, + std::plus::value_type>()); + } + + // std::transform + + template + OutputIt transform(InputIt first1, InputIt last1, OutputIt d_first, + UnaryOperation unary_op) { + while (first1 != last1) { + *d_first++ = unary_op(*first1++); + } + + return d_first; + } + + template + OutputIt transform(InputIt1 first1, InputIt1 last1, + InputIt2 first2, OutputIt d_first, + BinaryOperation binary_op) { + while (first1 != last1) { + *d_first++ = binary_op(*first1++, *first2++); + } + + return d_first; + } + } +#endif + } +} + +#endif + +namespace poolstl { + + namespace ttp = task_thread_pool; + + namespace execution { + namespace internal { + /** + * Holds the thread pool used by par. + */ + inline std::shared_ptr get_default_pool() { + static std::shared_ptr pool; + static std::once_flag flag; + std::call_once(flag, [&](){ pool = std::make_shared(); }); + return pool; + } + } + + /** + * Base class for all poolSTL policies. + */ + struct poolstl_policy { + }; + + /** + * A sequential policy that simply forwards to the non-policy overload. + */ + struct sequenced_policy : public poolstl_policy { + POOLSTL_NO_DISCARD ttp::task_thread_pool* pool() const { + // never called, but must exist for C++11 support + throw std::runtime_error("poolSTL: requested thread pool for seq policy."); + } + + POOLSTL_NO_DISCARD bool par_allowed() const { + return false; + } + }; + + /** + * A parallel policy that can use a user-specified thread pool or a default one. + */ + struct parallel_policy : public poolstl_policy { + parallel_policy() = default; + explicit parallel_policy(ttp::task_thread_pool* on_pool, bool par_ok): on_pool(on_pool), par_ok(par_ok) {} + + parallel_policy on(ttp::task_thread_pool& pool) const { + return parallel_policy{&pool, par_ok}; + } + + parallel_policy par_if(bool call_par) const { + return parallel_policy{on_pool, call_par}; + } + + POOLSTL_NO_DISCARD ttp::task_thread_pool* pool() const { + if (on_pool) { + return on_pool; + } else { + return internal::get_default_pool().get(); + } + } + + POOLSTL_NO_DISCARD bool par_allowed() const { + return par_ok; + } + + protected: + ttp::task_thread_pool *on_pool = nullptr; + bool par_ok = true; + }; + + constexpr sequenced_policy seq{}; + constexpr parallel_policy par{}; + + /** + * EXPERIMENTAL: Subject to significant changes or removal. + * Use pure threads for each operation instead of a shared thread pool. + * + * Advantage: + * - Fewer symbols (no packaged_task with its operators, destructors, vtable, etc) means smaller binary + * which can mean a lot when there are many calls. + * - No thread pool to manage. + * + * Disadvantages: + * - Threads are started and joined for every operation, so it is harder to amortize that cost. + * - Barely any algorithms are supported. + */ + struct pure_threads_policy : public poolstl_policy { + explicit pure_threads_policy(unsigned int num_threads, bool par_ok): num_threads(num_threads), + par_ok(par_ok) {} + + POOLSTL_NO_DISCARD unsigned int get_num_threads() const { + if (num_threads == 0) { + return std::thread::hardware_concurrency(); + } + return num_threads; + } + + POOLSTL_NO_DISCARD bool par_allowed() const { + return par_ok; + } + + protected: + unsigned int num_threads = 1; + bool par_ok = true; + }; + + /** + * Choose parallel or sequential at runtime. + * + * @param call_par Whether to use a parallel policy. + * @return `par` if call_par is true, else a sequential policy (like `seq`). + */ + inline parallel_policy par_if(bool call_par) { + return parallel_policy{nullptr, call_par}; + } + + /** + * Choose parallel or sequential at runtime, with pool selection. + * + * @param call_par Whether to use a parallel policy. + * @return `par.on(pool)` if call_par is true, else a sequential policy (like `seq`). + */ + inline parallel_policy par_if(bool call_par, ttp::task_thread_pool& pool) { + return parallel_policy{&pool, call_par}; + } + + /** + * EXPERIMENTAL: Subject to significant changes or removal. See `pure_threads_policy`. + * Choose parallel or sequential at runtime, with thread count selection. + * + * @param call_par Whether to use a parallel policy. + * @return `par.on(pool)` if call_par is true, else `seq`. + */ + inline pure_threads_policy par_if_threads(bool call_par, unsigned int num_threads) { + return pure_threads_policy{num_threads, call_par}; + } + } + + using execution::seq; + using execution::par; + using execution::par_if; + + namespace internal { + /** + * To enable/disable seq overload resolution + */ + template + using enable_if_seq = + typename std::enable_if< + std::is_same::type>::type>::value, + Tp>::type; + + /** + * To enable/disable par overload resolution + */ + template + using enable_if_par = + typename std::enable_if< + std::is_same::type>::type>::value, + Tp>::type; + + /** + * To enable/disable par overload resolution + */ + template + using enable_if_poolstl_policy = + typename std::enable_if< + std::is_base_of::type>::type>::value, + Tp>::type; + + template + bool is_seq(const ExecPolicy& policy) { + return !policy.par_allowed(); + } + + template + using is_pure_threads_policy = std::is_same::type>::type>; + } +} + +#endif + +namespace poolstl { + namespace internal { + +#if POOLSTL_HAVE_CXX17_LIB + /** + * Call std::apply in parallel. + */ + template + std::vector> + parallel_apply(ExecPolicy &&policy, Op op, const ArgContainer& args_list) { + std::vector> futures; + auto& task_pool = *policy.pool(); + + for (const auto& args : args_list) { + futures.emplace_back(task_pool.submit([](Op op, const auto& args_fwd) { + std::apply(op, args_fwd); + }, op, args)); + } + + return futures; + } +#endif + + /** + * Chunk a single range, with autodetected return types. + */ + template ()(std::declval(), std::declval()))> + std::vector> + parallel_chunk_for_gen(ExecPolicy &&policy, RandIt first, RandIt last, Chunk chunk, + ChunkRet* = (decltype(std::declval()(std::declval(), + std::declval()))*)nullptr, + int extra_split_factor = 1) { + std::vector> futures; + auto& task_pool = *policy.pool(); + auto chunk_size = get_chunk_size(first, last, extra_split_factor * task_pool.get_num_threads()); + + while (first < last) { + auto iter_chunk_size = get_iter_chunk_size(first, last, chunk_size); + RandIt loop_end = advanced(first, iter_chunk_size); + + futures.emplace_back(task_pool.submit(chunk, first, loop_end)); + + first = loop_end; + } + + return futures; + } + + /** + * Chunk a single range. + */ + template + std::vector> + parallel_chunk_for_1(ExecPolicy &&policy, RandIt first, RandIt last, + Chunk chunk, ChunkRet*, int extra_split_factor, A&&... chunk_args) { + std::vector> futures; + auto& task_pool = *policy.pool(); + auto chunk_size = get_chunk_size(first, last, extra_split_factor * task_pool.get_num_threads()); + + while (first < last) { + auto iter_chunk_size = get_iter_chunk_size(first, last, chunk_size); + RandIt loop_end = advanced(first, iter_chunk_size); + + futures.emplace_back(task_pool.submit(chunk, first, loop_end, chunk_args...)); + + first = loop_end; + } + + return futures; + } + + /** + * Chunk a single range. + */ + template + typename std::enable_if::value, void>::type + parallel_chunk_for_1_wait(ExecPolicy &&policy, RandIt first, RandIt last, + Chunk chunk, ChunkRet* rettype, int extra_split_factor, A&&... chunk_args) { + auto futures = parallel_chunk_for_1(std::forward(policy), first, last, + chunk, rettype, extra_split_factor, chunk_args...); + get_futures(futures); + } + + /** + * Element-wise chunk two ranges. + */ + template + std::vector> + parallel_chunk_for_2(ExecPolicy &&policy, RandIt1 first1, RandIt1 last1, RandIt2 first2, + Chunk chunk, ChunkRet*, A&&... chunk_args) { + std::vector> futures; + auto& task_pool = *policy.pool(); + auto chunk_size = get_chunk_size(first1, last1, task_pool.get_num_threads()); + + while (first1 < last1) { + auto iter_chunk_size = get_iter_chunk_size(first1, last1, chunk_size); + RandIt1 loop_end = advanced(first1, iter_chunk_size); + + futures.emplace_back(task_pool.submit(chunk, first1, loop_end, first2, chunk_args...)); + + first1 = loop_end; + std::advance(first2, iter_chunk_size); + } + + return futures; + } + + /** + * Element-wise chunk three ranges. + */ + template + std::vector> + parallel_chunk_for_3(ExecPolicy &&policy, RandIt1 first1, RandIt1 last1, RandIt2 first2, RandIt3 first3, + Chunk chunk, ChunkRet*, A&&... chunk_args) { + std::vector> futures; + auto& task_pool = *policy.pool(); + auto chunk_size = get_chunk_size(first1, last1, task_pool.get_num_threads()); + + while (first1 < last1) { + auto iter_chunk_size = get_iter_chunk_size(first1, last1, chunk_size); + RandIt1 loop_end = advanced(first1, iter_chunk_size); + + futures.emplace_back(task_pool.submit(chunk, first1, loop_end, first2, first3, chunk_args...)); + + first1 = loop_end; + std::advance(first2, iter_chunk_size); + std::advance(first3, iter_chunk_size); + } + + return futures; + } + + /** + * Sort a range in parallel. + * + * @param sort_func Sequential sort method, like std::sort or std::stable_sort + * @param merge_func Sequential merge method, like std::inplace_merge + */ + template + void parallel_mergesort(ExecPolicy &&policy, RandIt first, RandIt last, + Compare comp, SortFunc sort_func, MergeFunc merge_func) { + if (first == last) { + return; + } + + // Sort chunks in parallel + auto futures = parallel_chunk_for_gen(std::forward(policy), first, last, + [&comp, sort_func] (RandIt chunk_first, RandIt chunk_last) { + sort_func(chunk_first, chunk_last, comp); + return std::make_pair(chunk_first, chunk_last); + }); + + // Merge the sorted ranges + using SortedRange = std::pair; + auto& task_pool = *policy.pool(); + std::vector subranges; + do { + for (auto& future : futures) { + subranges.emplace_back(future.get()); + } + futures.clear(); + + for (std::size_t i = 0; i < subranges.size(); ++i) { + if (i + 1 < subranges.size()) { + // pair up and merge + auto& lhs = subranges[i]; + auto& rhs = subranges[i + 1]; + futures.emplace_back(task_pool.submit([&comp, merge_func] (RandIt chunk_first, + RandIt chunk_middle, + RandIt chunk_last) { + merge_func(chunk_first, chunk_middle, chunk_last, comp); + return std::make_pair(chunk_first, chunk_last); + }, lhs.first, lhs.second, rhs.second)); + ++i; + } else { + // forward the final extra range + std::promise p; + futures.emplace_back(p.get_future()); + p.set_value(subranges[i]); + } + } + + subranges.clear(); + } while (futures.size() > 1); + futures.front().get(); + } + + /** + * Quicksort worker function. + */ + template + void quicksort_impl(task_thread_pool::task_thread_pool* task_pool, const RandIt first, const RandIt last, + Compare comp, SortFunc sort_func, PartFunc part_func, PivotFunc pivot_func, + std::ptrdiff_t target_leaf_size, + std::vector>* futures, std::mutex* mutex, + std::condition_variable* cv, int* inflight_spawns) { + using T = typename std::iterator_traits::value_type; + + auto partition_size = std::distance(first, last); + + if (partition_size > target_leaf_size) { + // partition the range + auto mid = part_func(first, last, pivot_predicate(comp, pivot_func(first, last))); + + if (mid != first && mid != last) { + // was able to partition the range, so recurse + std::lock_guard guard(*mutex); + ++(*inflight_spawns); + + futures->emplace_back(task_pool->submit( + quicksort_impl, + task_pool, first, mid, comp, sort_func, part_func, pivot_func, target_leaf_size, + futures, mutex, cv, inflight_spawns)); + + futures->emplace_back(task_pool->submit( + quicksort_impl, + task_pool, mid, last, comp, sort_func, part_func, pivot_func, target_leaf_size, + futures, mutex, cv, inflight_spawns)); + return; + } + } + + // Range does not need to be subdivided (or was unable to subdivide). Run the sequential sort. + { + // notify main thread that partitioning may be finished + std::lock_guard guard(*mutex); + --(*inflight_spawns); + } + cv->notify_one(); + + sort_func(first, last, comp); + } + + /** + * Sort a range in parallel using quicksort. + * + * @param sort_func Sequential sort method, like std::sort or std::stable_sort + * @param part_func Method that partitions a range, like std::partition or std::stable_partition + * @param pivot_func Method that identifies the pivot + */ + template + void parallel_quicksort(ExecPolicy &&policy, RandIt first, RandIt last, + Compare comp, SortFunc sort_func, PartFunc part_func, PivotFunc pivot_func) { + if (first == last) { + return; + } + + auto& task_pool = *policy.pool(); + + // Target partition size. Range will be recursively partitioned into partitions no bigger than this + // size. Target approximately twice as many partitions as threads to reduce impact of uneven pivot + // selection. + auto num_threads = task_pool.get_num_threads(); + std::ptrdiff_t target_leaf_size = std::max(std::distance(first, last) / (num_threads * 2), + (std::ptrdiff_t)5); + + if (num_threads == 1) { + target_leaf_size = std::distance(first, last); + } + + // task_thread_pool does not support creating task DAGs, so organize the code such that + // all parallel tasks are independent. The parallel tasks can spawn additional parallel tasks, and they + // record their "child" task's std::future into a common vector to be waited on by the main thread. + std::mutex mutex; + + // Futures of parallel tasks. Access protected by mutex. + std::vector> futures; + + // For signaling that all partitioning has been completed and futures vector is complete. Uses mutex. + std::condition_variable cv; + + // Number of `quicksort_impl` calls that haven't finished yet. Nonzero value means futures vector may + // still be modified. Access protected by mutex. + int inflight_spawns = 1; + + // Root task. + quicksort_impl(&task_pool, first, last, comp, sort_func, part_func, pivot_func, target_leaf_size, + &futures, &mutex, &cv, &inflight_spawns); + + // Wait for all partitioning to finish. + { + std::unique_lock lock(mutex); + cv.wait(lock, [&] { return inflight_spawns == 0; }); + } + + // Wait on all the parallel tasks. + get_futures(futures); + } + + /** + * Partition range according to predicate. Unstable. + * + * This implementation only parallelizes with p=2; will spawn and wait for only one task. + */ + template + RandIt partition_p2(task_thread_pool::task_thread_pool &task_pool, RandIt first, RandIt last, Predicate pred) { + auto range_size = std::distance(first, last); + if (range_size < 4) { + return std::partition(first, last, pred); + } + + // approach should be generalizable to arbitrary p + + RandIt mid = std::next(first + range_size / 2); + + // partition left and right halves in parallel + auto left_future = task_pool.submit(std::partition, first, mid, pred); + RandIt right_mid = std::partition(mid, last, pred); + RandIt left_mid = left_future.get(); + + // merge the two partitioned halves + auto left_highs_size = std::distance(left_mid, mid); + auto right_lows_size = std::distance(mid, right_mid); + if (left_highs_size <= right_lows_size) { + std::swap_ranges(left_mid, mid, right_mid - left_highs_size); + return right_mid - left_highs_size; + } else { + std::swap_ranges(mid, right_mid, left_mid); + return left_mid + right_lows_size; + } + } + } +} + +#endif + +#ifndef POOLSTL_INTERNAL_THREAD_IMPL_HPP +#define POOLSTL_INTERNAL_THREAD_IMPL_HPP + +/** + * EXPERIMENTAL: Subject to significant changes or removal. + * An implementation using only std::thread and no thread pool at all. + * + * Advantage: + * - Fewer symbols (no packaged_task with its operators, destructors, vtable, etc) means smaller binary + * which can mean a lot when there are many calls like with many templates. + * - No thread pool to manage. + * + * Disadvantages: + * - Threads are started and joined for every operation, so it is harder to amortize that cost. + * - Barely any algorithms are supported. + */ + + + +namespace poolstl { + namespace internal { + + template + typename std::enable_if::value, void>::type + parallel_chunk_for_1_wait(ExecPolicy &&policy, RandIt first, RandIt last, + Chunk chunk, ChunkRet*, int extra_split_factor, A&&... chunk_args) { + std::vector threads; + auto chunk_size = get_chunk_size(first, last, extra_split_factor * policy.get_num_threads()); + + while (first < last) { + auto iter_chunk_size = get_iter_chunk_size(first, last, chunk_size); + RandIt loop_end = advanced(first, iter_chunk_size); + + threads.emplace_back(std::thread(chunk, first, loop_end, chunk_args...)); + + first = loop_end; + } + + for (auto& thread : threads) { + if (thread.joinable()) { + thread.join(); + } + } + } + } +} + +#endif + +namespace poolstl { + /** + * NOTE: Iterators are expected to be random access. + * + * Like `std::sort`, but allows specifying the sequential sort method, which must have the + * same signature as the comparator version of `std::sort`. + * + * Implemented as a high-level quicksort that delegates to `sort_func`, in parallel, once the range has been + * sufficiently partitioned. + */ + template + poolstl::internal::enable_if_poolstl_policy + pluggable_sort(ExecPolicy &&policy, RandIt first, RandIt last, Compare comp, + void (sort_func)(RandIt, RandIt, Compare) = std::sort) { + if (poolstl::internal::is_seq(policy)) { + sort_func(first, last, comp); + return; + } + + // Parallel partition. + // The partition_p2 method spawns and waits for its own child task. A deadlock is possible if all worker + // threads are waiting for tasks that in turn have to workers to execute them. This is only an issue because + // our thread pool does not have the concept of dependencies. + // So ensure + auto& task_pool = *policy.pool(); + std::atomic allowed_parallel_partitions{(int)task_pool.get_num_threads() / 2}; + + auto part_func = [&task_pool, &allowed_parallel_partitions](RandIt chunk_first, RandIt chunk_last, + poolstl::internal::pivot_predicate::value_type> pred) { + if (allowed_parallel_partitions.fetch_sub(1) > 0) { + return poolstl::internal::partition_p2(task_pool, chunk_first, chunk_last, pred); + } else { + return std::partition(chunk_first, chunk_last, pred); + } + }; + + poolstl::internal::parallel_quicksort(std::forward(policy), first, last, comp, sort_func, part_func, + poolstl::internal::quicksort_pivot); + } + + /** + * NOTE: Iterators are expected to be random access. + * + * Like `std::sort`, but allows specifying the sequential sort method, which must have the + * same signature as the comparator version of `std::sort`. + * + * Implemented as a parallel high-level quicksort that delegates to `sort_func` once the range has been + * sufficiently partitioned. + */ + template + poolstl::internal::enable_if_poolstl_policy + pluggable_sort(ExecPolicy &&policy, RandIt first, RandIt last, + void (sort_func)(RandIt, RandIt, + std::less::value_type>) = std::sort){ + using T = typename std::iterator_traits::value_type; + pluggable_sort(std::forward(policy), first, last, std::less(), sort_func); + } +} + +namespace std { + + /** + * NOTE: Iterators are expected to be random access. + * See std::copy https://en.cppreference.com/w/cpp/algorithm/copy + */ + template + poolstl::internal::enable_if_poolstl_policy + copy(ExecPolicy &&policy, RandIt1 first, RandIt1 last, RandIt2 dest) { + if (poolstl::internal::is_seq(policy)) { + return std::copy(first, last, dest); + } + + auto futures = poolstl::internal::parallel_chunk_for_2(std::forward(policy), first, last, dest, + std::copy, (RandIt2*)nullptr); + poolstl::internal::get_futures(futures); + return poolstl::internal::advanced(dest, std::distance(first, last)); + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::copy_n https://en.cppreference.com/w/cpp/algorithm/copy_n + */ + template + poolstl::internal::enable_if_poolstl_policy + copy_n(ExecPolicy &&policy, RandIt1 first, Size n, RandIt2 dest) { + if (n <= 0) { + return dest; + } + RandIt1 last = poolstl::internal::advanced(first, n); + std::copy(std::forward(policy), first, last, dest); + return poolstl::internal::advanced(dest, n); + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::count_if https://en.cppreference.com/w/cpp/algorithm/count_if + */ + template + poolstl::internal::enable_if_poolstl_policy::difference_type> + count_if(ExecPolicy&& policy, RandIt first, RandIt last, UnaryPredicate p) { + if (poolstl::internal::is_seq(policy)) { + return std::count_if(first, last, p); + } + + using T = typename iterator_traits::difference_type; + + auto futures = poolstl::internal::parallel_chunk_for_1(std::forward(policy), first, last, + std::count_if, + (T*)nullptr, 1, p); + + return poolstl::internal::cpp17::reduce( + poolstl::internal::get_wrap(futures.begin()), + poolstl::internal::get_wrap(futures.end()), (T)0, std::plus()); + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::count https://en.cppreference.com/w/cpp/algorithm/count + */ + template + poolstl::internal::enable_if_poolstl_policy::difference_type> + count(ExecPolicy&& policy, RandIt first, RandIt last, const T& value) { + return std::count_if(std::forward(policy), first, last, + [&value](const T& test) { return test == value; }); + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::fill https://en.cppreference.com/w/cpp/algorithm/fill + */ + template + poolstl::internal::enable_if_poolstl_policy + fill(ExecPolicy &&policy, RandIt first, RandIt last, const Tp& value) { + if (poolstl::internal::is_seq(policy)) { + std::fill(first, last, value); + return; + } + + poolstl::internal::parallel_chunk_for_1_wait(std::forward(policy), first, last, + std::fill, (void*)nullptr, 1, value); + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::fill_n https://en.cppreference.com/w/cpp/algorithm/fill_n + */ + template + poolstl::internal::enable_if_poolstl_policy + fill_n(ExecPolicy &&policy, RandIt first, Size n, const Tp& value) { + if (n <= 0) { + return first; + } + RandIt last = poolstl::internal::advanced(first, n); + std::fill(std::forward(policy), first, last, value); + return last; + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::find_if https://en.cppreference.com/w/cpp/algorithm/find_if + */ + template + poolstl::internal::enable_if_poolstl_policy + find_if(ExecPolicy &&policy, RandIt first, RandIt last, UnaryPredicate p) { + if (poolstl::internal::is_seq(policy)) { + return std::find_if(first, last, p); + } + + using diff_t = typename std::iterator_traits::difference_type; + diff_t n = std::distance(first, last); + std::atomic extremum(n); + + poolstl::internal::parallel_chunk_for_1_wait(std::forward(policy), first, last, + [&first, &extremum, &p](RandIt chunk_first, RandIt chunk_last) { + if (std::distance(first, chunk_first) > extremum) { + // already found by another task + return; + } + + RandIt chunk_res = std::find_if(chunk_first, chunk_last, p); + if (chunk_res != chunk_last) { + // Found, update exremum using a priority update CAS, as discussed in + // "Reducing Contention Through Priority Updates", PPoPP '13 + const diff_t k = std::distance(first, chunk_res); + for (diff_t old = extremum; k < old; old = extremum) { + extremum.compare_exchange_weak(old, k); + } + } + }, (void*)nullptr, + 8); // use small tasks so later ones may exit early if item is already found + return extremum == n ? last : first + extremum; + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::find_if_not https://en.cppreference.com/w/cpp/algorithm/find_if_not + */ + template + poolstl::internal::enable_if_poolstl_policy + find_if_not(ExecPolicy &&policy, RandIt first, RandIt last, UnaryPredicate p) { + return std::find_if(std::forward(policy), first, last, +#if POOLSTL_HAVE_CXX17_LIB + std::not_fn(p) +#else + [&p](const typename std::iterator_traits::value_type& test) { return !p(test); } +#endif + ); + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::find https://en.cppreference.com/w/cpp/algorithm/find + */ + template + poolstl::internal::enable_if_poolstl_policy + find(ExecPolicy &&policy, RandIt first, RandIt last, const T& value) { + return std::find_if(std::forward(policy), first, last, + [&value](const T& test) { return value == test; }); + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::for_each https://en.cppreference.com/w/cpp/algorithm/for_each + */ + template + poolstl::internal::enable_if_poolstl_policy + for_each(ExecPolicy &&policy, RandIt first, RandIt last, UnaryFunction f) { + // Using a lambda instead of just calling the non-policy std::for_each because it appears to + // result in a smaller binary. + auto chunk_func = [&f](RandIt chunk_first, RandIt chunk_last) { + for (; chunk_first != chunk_last; ++chunk_first) { + f(*chunk_first); + } + }; + + if (poolstl::internal::is_seq(policy)) { + chunk_func(first, last); + return; + } + + poolstl::internal::parallel_chunk_for_1_wait(std::forward(policy), first, last, + chunk_func, (void*)nullptr, 1); + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::for_each_n https://en.cppreference.com/w/cpp/algorithm/for_each_n + */ + template + poolstl::internal::enable_if_poolstl_policy + for_each_n(ExecPolicy &&policy, RandIt first, Size n, UnaryFunction f) { + RandIt last = poolstl::internal::advanced(first, n); + std::for_each(std::forward(policy), first, last, f); + return last; + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::partition https://en.cppreference.com/w/cpp/algorithm/partition + * + * Current implementation uses at most 2 threads. + */ + template + poolstl::internal::enable_if_poolstl_policy + partition(ExecPolicy &&policy, RandIt first, RandIt last, Predicate pred) { + if (poolstl::internal::is_seq(policy)) { + return std::partition(first, last, pred); + } + + return poolstl::internal::partition_p2(*policy.pool(), first, last, pred); + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::sort https://en.cppreference.com/w/cpp/algorithm/sort + */ + template + poolstl::internal::enable_if_poolstl_policy + sort(ExecPolicy &&policy, RandIt first, RandIt last, Compare comp) { + if (poolstl::internal::is_seq(policy)) { + std::sort(first, last, comp); + return; + } + + poolstl::pluggable_sort(std::forward(policy), first, last, comp, std::sort); + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::sort https://en.cppreference.com/w/cpp/algorithm/sort + */ + template + poolstl::internal::enable_if_poolstl_policy + sort(ExecPolicy &&policy, RandIt first, RandIt last) { + using T = typename std::iterator_traits::value_type; + std::sort(std::forward(policy), first, last, std::less()); + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::stable_sort https://en.cppreference.com/w/cpp/algorithm/stable_sort + */ + template + poolstl::internal::enable_if_poolstl_policy + stable_sort(ExecPolicy &&policy, RandIt first, RandIt last, Compare comp) { + if (poolstl::internal::is_seq(policy)) { + std::stable_sort(first, last, comp); + return; + } + + poolstl::internal::parallel_quicksort(std::forward(policy), first, last, comp, + std::stable_sort, + std::stable_partition::value_type>>, + poolstl::internal::quicksort_pivot); + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::stable_sort https://en.cppreference.com/w/cpp/algorithm/stable_sort + */ + template + poolstl::internal::enable_if_poolstl_policy + stable_sort(ExecPolicy &&policy, RandIt first, RandIt last) { + using T = typename std::iterator_traits::value_type; + std::stable_sort(std::forward(policy), first, last, std::less()); + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::transform https://en.cppreference.com/w/cpp/algorithm/transform + */ + template + poolstl::internal::enable_if_poolstl_policy + transform(ExecPolicy&& policy, RandIt1 first1, RandIt1 last1, + RandIt2 dest, UnaryOperation unary_op) { + if (poolstl::internal::is_seq(policy)) { + return poolstl::internal::cpp17::transform(first1, last1, dest, unary_op); + } + + auto futures = poolstl::internal::parallel_chunk_for_2(std::forward(policy), first1, last1, dest, + poolstl::internal::cpp17::transform, + (RandIt2*)nullptr, unary_op); + poolstl::internal::get_futures(futures); + return dest + std::distance(first1, last1); + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::transform https://en.cppreference.com/w/cpp/algorithm/transform + */ + template + poolstl::internal::enable_if_poolstl_policy + transform(ExecPolicy&& policy, RandIt1 first1, RandIt1 last1, + RandIt2 first2, RandIt3 dest, BinaryOperation binary_op) { + if (poolstl::internal::is_seq(policy)) { + return poolstl::internal::cpp17::transform(first1, last1, first2, dest, binary_op); + } + + auto futures = poolstl::internal::parallel_chunk_for_3(std::forward(policy), first1, last1, + first2, dest, + poolstl::internal::cpp17::transform, + (RandIt3*)nullptr, binary_op); + poolstl::internal::get_futures(futures); + return dest + std::distance(first1, last1); + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::all_of https://en.cppreference.com/w/cpp/algorithm/all_of + */ + template + poolstl::internal::enable_if_poolstl_policy + all_of(ExecPolicy&& policy, RandIt first, RandIt last, Predicate pred) { + return last == std::find_if_not(std::forward(policy), first, last, pred); + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::none_of https://en.cppreference.com/w/cpp/algorithm/none_of + */ + template + poolstl::internal::enable_if_poolstl_policy + none_of(ExecPolicy&& policy, RandIt first, RandIt last, Predicate pred) { + return last == std::find_if(std::forward(policy), first, last, pred); + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::any_of https://en.cppreference.com/w/cpp/algorithm/any_of + */ + template + poolstl::internal::enable_if_poolstl_policy + any_of(ExecPolicy&& policy, RandIt first, RandIt last, Predicate pred) { + return !std::none_of(std::forward(policy), first, last, pred); + } +} + +namespace poolstl { + + template + void for_each_chunk(RandIt first, RandIt last, ChunkConstructor construct, UnaryFunction f) { + if (first == last) { + return; + } + + auto chunk_data = construct(); + for (; first != last; ++first) { + f(*first, chunk_data); + } + } + + /** + * NOTE: Iterators are expected to be random access. + * + * Like `std::for_each`, but exposes the chunking. The `construct` method is called once per parallel chunk and + * its output is passed to `f`. + * + * Useful for cases where an expensive workspace can be shared between loop iterations + * but cannot be shared by all parallel iterations. + */ + template + poolstl::internal::enable_if_poolstl_policy + for_each_chunk(ExecPolicy&& policy, RandIt first, RandIt last, ChunkConstructor construct, UnaryFunction f) { + if (poolstl::internal::is_seq(policy)) { + for_each_chunk(first, last, construct, f); + return; + } + + poolstl::internal::parallel_chunk_for_1_wait(std::forward(policy), first, last, + for_each_chunk , + (void*)nullptr, 1, construct, f); + } + + /** + * NOTE: Iterators are expected to be random access. + * + * Parallel merge sort. + * + * @param comp Comparator. + * @param sort_func Sequential sort method. Must have the same signature as the comparator version of `std::sort`. + * @param merge_func Sequential merge method. Must have the same signature as `std::inplace_merge`. + */ + template + poolstl::internal::enable_if_poolstl_policy + pluggable_mergesort(ExecPolicy &&policy, RandIt first, RandIt last, Compare comp, + void (sort_func)(RandIt, RandIt, Compare) = std::sort, + void (merge_func)(RandIt, RandIt, RandIt, Compare) = std::inplace_merge) { + if (poolstl::internal::is_seq(policy)) { + sort_func(first, last, comp); + return; + } + + poolstl::internal::parallel_mergesort(std::forward(policy), + first, last, comp, sort_func, merge_func); + } + + /** + * NOTE: Iterators are expected to be random access. + * + * Parallel merge sort. + * + * Uses `std::less` comparator. + * + * @param sort_func Sequential sort method. Must have the same signature as the comparator version of `std::sort`. + * @param merge_func Sequential merge method. Must have the same signature as `std::inplace_merge`. + */ + template + poolstl::internal::enable_if_poolstl_policy + pluggable_mergesort(ExecPolicy &&policy, RandIt first, RandIt last, + void (sort_func)(RandIt, RandIt, + std::less::value_type>) = std::sort, + void (merge_func)(RandIt, RandIt, RandIt, + std::less::value_type>) = std::inplace_merge){ + using T = typename std::iterator_traits::value_type; + pluggable_mergesort(std::forward(policy), first, last, std::less(), sort_func, merge_func); + } + + /** + * NOTE: Iterators are expected to be random access. + * + * Parallel quicksort that allows specifying the sequential sort and partition methods. + * + * @param comp Comparator. + * @param sort_func Sequential sort method to use once range is sufficiently partitioned. Must have the same + * signature as the comparator version of `std::sort`. + * @param part_func Sequential partition method. Must have the same signature as `std::partition`. + * @param pivot_func Method that identifies the pivot element + */ + template + poolstl::internal::enable_if_poolstl_policy + pluggable_quicksort(ExecPolicy &&policy, RandIt first, RandIt last, Compare comp, + void (sort_func)(RandIt, RandIt, Compare) = std::sort, + RandIt (part_func)(RandIt, RandIt, poolstl::internal::pivot_predicate::value_type>) = std::partition, + typename std::iterator_traits::value_type (pivot_func)(RandIt, RandIt) = + poolstl::internal::quicksort_pivot) { + if (poolstl::internal::is_seq(policy)) { + sort_func(first, last, comp); + return; + } + + poolstl::internal::parallel_quicksort(std::forward(policy), + first, last, comp, sort_func, part_func, pivot_func); + } + + /** + * NOTE: Iterators are expected to be random access. + * + * Parallel quicksort that allows specifying the sequential sort and partition methods. + * + * Uses `std::less` comparator. + * + * @param sort_func Sequential sort method to use once range is sufficiently partitioned. Must have the same + * signature as the comparator version of `std::sort`. + * @param part_func Sequential partition method. Must have the same signature as `std::partition`. + * @param pivot_func Method that identifies the pivot element + */ + template + poolstl::internal::enable_if_poolstl_policy + pluggable_quicksort(ExecPolicy &&policy, RandIt first, RandIt last, + void (sort_func)(RandIt, RandIt, + std::less::value_type>) = std::sort, + RandIt (part_func)(RandIt, RandIt, poolstl::internal::pivot_predicate< + std::less::value_type>, + typename std::iterator_traits::value_type>) = std::partition, + typename std::iterator_traits::value_type (pivot_func)(RandIt, RandIt) = + poolstl::internal::quicksort_pivot) { + using T = typename std::iterator_traits::value_type; + pluggable_quicksort(std::forward(policy), first, last, std::less(), + sort_func, part_func, pivot_func); + } +} + +#endif + +#ifndef POOLSTL_NUMERIC_HPP +#define POOLSTL_NUMERIC_HPP + +#include + + +namespace std { + +#if POOLSTL_HAVE_CXX17_LIB + /** + * NOTE: Iterators are expected to be random access. + * See std::exclusive_scan https://en.cppreference.com/w/cpp/algorithm/exclusive_scan + */ + template + poolstl::internal::enable_if_poolstl_policy + exclusive_scan(ExecPolicy &&policy, RandIt1 first, RandIt1 last, RandIt2 dest, T init, BinaryOp binop) { + if (first == last) { + return dest; + } + + if (poolstl::internal::is_seq(policy)) { + return std::exclusive_scan(first, last, dest, init, binop); + } + + // Pass 1: Chunk the input and find the sum of each chunk + auto futures = poolstl::internal::parallel_chunk_for_gen(std::forward(policy), first, last, + [binop](RandIt1 chunk_first, RandIt1 chunk_last) { + auto sum = std::accumulate(chunk_first, chunk_last, T{}, binop); + return std::make_tuple(std::make_pair(chunk_first, chunk_last), sum); + }); + + std::vector> ranges; + std::vector sums; + + for (auto& future : futures) { + auto res = future.get(); + ranges.push_back(std::get<0>(res)); + sums.push_back(std::get<1>(res)); + } + + // find initial values for each range + std::exclusive_scan(sums.begin(), sums.end(), sums.begin(), init, binop); + + // Pass 2: perform exclusive scan of each chunk, using the sum of previous chunks as init + std::vector> args; + for (std::size_t i = 0; i < sums.size(); ++i) { + auto chunk_first = std::get<0>(ranges[i]); + args.emplace_back(std::make_tuple( + chunk_first, std::get<1>(ranges[i]), + dest + (chunk_first - first), + sums[i])); + } + + auto futures2 = poolstl::internal::parallel_apply(std::forward(policy), + [binop](RandIt1 chunk_first, RandIt1 chunk_last, RandIt2 chunk_dest, T chunk_init){ + std::exclusive_scan(chunk_first, chunk_last, chunk_dest, chunk_init, binop); + }, args); + + poolstl::internal::get_futures(futures2); + return dest + (last - first); + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::exclusive_scan https://en.cppreference.com/w/cpp/algorithm/exclusive_scan + */ + template + poolstl::internal::enable_if_poolstl_policy + exclusive_scan(ExecPolicy &&policy, RandIt1 first, RandIt1 last, RandIt2 dest, T init) { + return std::exclusive_scan(std::forward(policy), first, last, dest, init, std::plus()); + } +#endif + + /** + * NOTE: Iterators are expected to be random access. + * See std::reduce https://en.cppreference.com/w/cpp/algorithm/reduce + */ + template + poolstl::internal::enable_if_poolstl_policy + reduce(ExecPolicy &&policy, RandIt first, RandIt last, T init, BinaryOp binop) { + if (poolstl::internal::is_seq(policy)) { + return poolstl::internal::cpp17::reduce(first, last, init, binop); + } + + auto futures = poolstl::internal::parallel_chunk_for_1(std::forward(policy), first, last, + poolstl::internal::cpp17::reduce, + (T*)nullptr, 1, init, binop); + + return poolstl::internal::cpp17::reduce( + poolstl::internal::get_wrap(futures.begin()), + poolstl::internal::get_wrap(futures.end()), init, binop); + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::reduce https://en.cppreference.com/w/cpp/algorithm/reduce + */ + template + poolstl::internal::enable_if_poolstl_policy + reduce(ExecPolicy &&policy, RandIt first, RandIt last, T init) { + return std::reduce(std::forward(policy), first, last, init, std::plus()); + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::reduce https://en.cppreference.com/w/cpp/algorithm/reduce + */ + template + poolstl::internal::enable_if_poolstl_policy< + ExecPolicy, typename std::iterator_traits::value_type> + reduce(ExecPolicy &&policy, RandIt first, RandIt last) { + return std::reduce(std::forward(policy), first, last, + typename std::iterator_traits::value_type{}); + } + +#if POOLSTL_HAVE_CXX17_LIB + /** + * NOTE: Iterators are expected to be random access. + * See std::transform_reduce https://en.cppreference.com/w/cpp/algorithm/transform_reduce + */ + template + poolstl::internal::enable_if_poolstl_policy + transform_reduce(ExecPolicy&& policy, RandIt1 first1, RandIt1 last1, T init, + BinaryReductionOp reduce_op, UnaryTransformOp transform_op) { + if (poolstl::internal::is_seq(policy)) { + return std::transform_reduce(first1, last1, init, reduce_op, transform_op); + } + + auto futures = poolstl::internal::parallel_chunk_for_1(std::forward(policy), first1, last1, + std::transform_reduce, + (T*)nullptr, 1, init, reduce_op, transform_op); + + return poolstl::internal::cpp17::reduce( + poolstl::internal::get_wrap(futures.begin()), + poolstl::internal::get_wrap(futures.end()), init, reduce_op); + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::transform_reduce https://en.cppreference.com/w/cpp/algorithm/transform_reduce + */ + template + poolstl::internal::enable_if_poolstl_policy + transform_reduce(ExecPolicy&& policy, RandIt1 first1, RandIt1 last1, RandIt2 first2, T init, + BinaryReductionOp reduce_op, BinaryTransformOp transform_op) { + if (poolstl::internal::is_seq(policy)) { + return std::transform_reduce(first1, last1, first2, init, reduce_op, transform_op); + } + + auto futures = poolstl::internal::parallel_chunk_for_2(std::forward(policy), first1, last1, first2, + std::transform_reduce, + (T*)nullptr, init, reduce_op, transform_op); + + return poolstl::internal::cpp17::reduce( + poolstl::internal::get_wrap(futures.begin()), + poolstl::internal::get_wrap(futures.end()), init, reduce_op); + } + + /** + * NOTE: Iterators are expected to be random access. + * See std::transform_reduce https://en.cppreference.com/w/cpp/algorithm/transform_reduce + */ + template< class ExecPolicy, class RandIt1, class RandIt2, class T > + poolstl::internal::enable_if_poolstl_policy + transform_reduce(ExecPolicy&& policy, RandIt1 first1, RandIt1 last1, RandIt2 first2, T init ) { + return transform_reduce(std::forward(policy), + first1, last1, first2, init, std::plus<>(), std::multiplies<>()); + } +#endif + +} + +#endif + +// Note that iota_iter.hpp is self-contained in its own right. + +#ifndef POOLSTL_IOTA_ITER_HPP +#define POOLSTL_IOTA_ITER_HPP + +#include +#include + +namespace poolstl { + + /** + * An iterator over the integers. + * + * Effectively a view on a fictional vector populated by std::iota, but without materializing anything. + * + * Useful to parallelize loops that are not over a container, like this: + * + * \code{.cpp} + * for (int i = 0; i < 10; ++i) { + * } + *\endcode + * + * Becomes: + * \code{.cpp} + * std::for_each(iota_iter(0), iota_iter(10), [](int i) { + * }); + * \endcode + * + * @tparam T A type that acts as an integer. + */ + template + class iota_iter { + public: + using value_type = T; + using difference_type = std::ptrdiff_t; + using pointer = T *; + using reference = T; + using iterator_category = std::random_access_iterator_tag; + + iota_iter() : value{} {} + explicit iota_iter(T rhs) : value(rhs) {} + iota_iter(const iota_iter &rhs) : value(rhs.value) {} + + iota_iter &operator=(T rhs) { value = rhs; return *this; } + iota_iter &operator=(const iota_iter &rhs) { value = rhs.value; return *this; } + + reference operator*() const { return value; } + reference operator[](difference_type rhs) const { return value + rhs; } + // operator-> has no meaning in this application + + bool operator==(const iota_iter &rhs) const { return value == rhs.value; } + bool operator!=(const iota_iter &rhs) const { return value != rhs.value; } + bool operator<(const iota_iter &rhs) const { return value < rhs.value; } + bool operator>(const iota_iter &rhs) const { return value > rhs.value; } + bool operator<=(const iota_iter &rhs) const { return value <= rhs.value; } + bool operator>=(const iota_iter &rhs) const { return value >= rhs.value; } + + iota_iter &operator+=(difference_type rhs) { value += rhs; return *this; } + iota_iter &operator-=(difference_type rhs) { value -= rhs; return *this; } + + iota_iter &operator++() { ++value; return *this; } + iota_iter &operator--() { --value; return *this; } + iota_iter operator++(int) { iota_iter ret(value); ++value; return ret; } + iota_iter operator--(int) { iota_iter ret(value); --value; return ret; } + + difference_type operator-(const iota_iter &rhs) const { return value - rhs.value; } + iota_iter operator-(difference_type rhs) const { return iota_iter(value - rhs); } + iota_iter operator+(difference_type rhs) const { return iota_iter(value + rhs); } + + friend inline iota_iter operator+(difference_type lhs, const iota_iter &rhs) { + return iota_iter(lhs + rhs.value); + } + + protected: + T value; + }; +} + +namespace std { + /** + * Specialize std::iterator_traits for poolstl::iota_iter. + */ + template + struct iterator_traits> { + using value_type = typename poolstl::iota_iter::value_type; + using difference_type = typename poolstl::iota_iter::difference_type; + using pointer = typename poolstl::iota_iter::pointer; + using reference = typename poolstl::iota_iter::reference; + using iterator_category = typename poolstl::iota_iter::iterator_category; + }; +} + +#endif + +/* + * Optionally alias `poolstl::par` as `std::execution::par` to enable poolSTL to fill in for missing compiler support. + * + * USE AT YOUR OWN RISK! + * + * To use this define POOLSTL_STD_SUPPLEMENT=1 before including poolstl.hpp. + * + * Attempts to autodetect native support by checking for , including it if it exists, and then checking for + * the __cpp_lib_parallel_algorithm feature macro. + * + * If native support is not found then the standard execution policies are declared as forwards to poolSTL. + * + * GCC and Clang: TBB is required if is #included. If you'd like to use the poolSTL supplement in cases + * that TBB is not available, have your build system define POOLSTL_STD_SUPPLEMENT_NO_INCLUDE if TBB is not found. + * PoolSTL will then not include and the supplement will kick in. + * Your code must not #include . + * + * MinGW: the compiler declares support, but actual performance is sequential (see poolSTL benchmark). To use + * the supplement anyway define POOLSTL_STD_SUPPLEMENT_FORCE to override the autodetection. + * Your code must not #include . + * + * Define POOLSTL_ALLOW_SUPPLEMENT=0 to override POOLSTL_STD_SUPPLEMENT and disable this feature. + */ +#ifndef POOLSTL_ALLOW_SUPPLEMENT +#define POOLSTL_ALLOW_SUPPLEMENT 1 +#endif + +#if POOLSTL_ALLOW_SUPPLEMENT && defined(POOLSTL_STD_SUPPLEMENT) + +#if __cplusplus >= 201603L || (defined(_MSVC_LANG) && _MSVC_LANG >= 201603L) +#if __has_include() +#ifndef POOLSTL_STD_SUPPLEMENT_NO_INCLUDE +#endif +#endif +#endif + +#if !defined(__cpp_lib_parallel_algorithm) || defined(POOLSTL_STD_SUPPLEMENT_FORCE) +namespace std { + namespace execution { + using ::poolstl::execution::sequenced_policy; + using ::poolstl::execution::seq; + using ::poolstl::execution::parallel_policy; + using ::poolstl::execution::par; + using parallel_unsequenced_policy = ::poolstl::execution::parallel_policy; + constexpr parallel_unsequenced_policy par_unseq{}; + } +} + +#endif +#endif + +#endif diff --git a/include/bitcoin/system/preprocessor.hpp b/include/bitcoin/system/preprocessor.hpp index e24982dfe5..75170c76f5 100644 --- a/include/bitcoin/system/preprocessor.hpp +++ b/include/bitcoin/system/preprocessor.hpp @@ -213,6 +213,7 @@ #define std_reduce(p, b, e, i, l) std::reduce((p), (b), (e), (i), (l)) #define std_transform(p, b, e, t, l) std::transform((p), (b), (e), (t), (l)) #else + #include #define std_any_of(p, b, e, l) std::any_of((b), (e), (l)) #define std_all_of(p, b, e, l) std::all_of((b), (e), (l)) #define std_for_each(p, b, e, l) std::for_each((b), (e), (l)) diff --git a/src/define.cpp b/src/define.cpp index 213818e426..40e097b77d 100644 --- a/src/define.cpp +++ b/src/define.cpp @@ -25,8 +25,9 @@ // System inclusions are chained as follows. // version : +// execution : // have : version -// preprocessor : have +// preprocessor : have, execution // warnings : preprocessor // boost : warnings // exceptions : boost