diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index cc22478ac7..bf797e36ec 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -23,13 +23,12 @@ jobs:
optimization: "debug"
assert: "debug"
coverage: "nocov"
- detectcpuflags: "ignore"
boost: "--build-boost"
icu: ""
secp256k1: "--build-secp256k1"
cc: "clang-15"
flags: "-Og -fPIE"
- options: "--enable-isystem --enable-avx2 --enable-sse41"
+ options: "--enable-isystem --enable-avx2 --enable-sse4"
packager: "apt"
packages: ""
@@ -39,7 +38,6 @@ jobs:
optimization: "size"
assert: "ndebug"
coverage: "nocov"
- detectcpuflags: "ignore"
boost: "--build-boost"
icu: "--build-icu --with-icu"
secp256k1: "--build-secp256k1"
@@ -55,13 +53,12 @@ jobs:
optimization: "size"
assert: "ndebug"
coverage: "nocov"
- detectcpuflags: "ignore"
boost: "--build-boost"
icu: ""
secp256k1: "--build-secp256k1"
cc: "gcc-11"
flags: "-Os -fPIE"
- options: "--enable-isystem --enable-sse41"
+ options: "--enable-isystem --enable-sse4"
packager: "apt"
packages: ""
@@ -71,7 +68,6 @@ jobs:
optimization: "size"
assert: "ndebug"
coverage: "cov"
- detectcpuflags: "detect"
boost: "--build-boost"
icu: "--build-icu --with-icu"
secp256k1: "--build-secp256k1"
@@ -87,7 +83,6 @@ jobs:
optimization: "size"
assert: "ndebug"
coverage: "nocov"
- detectcpuflags: "ignore"
boost: "--build-boost"
icu: ""
secp256k1: "--build-secp256k1"
@@ -103,7 +98,6 @@ jobs:
optimization: "size"
assert: "ndebug"
coverage: "nocov"
- detectcpuflags: "ignore"
boost: "--build-boost"
icu: ""
secp256k1: "--build-secp256k1"
@@ -142,29 +136,6 @@ jobs:
run: |
brew install autoconf automake libtool ${{ matrix.packages }}
- - name: Determine CPU flags
- shell: bash
- run: |
- if [[ -n $(cat /proc/cpuinfo | grep flags | grep " sse4_1 ") ]]; then
- echo "CPU_SUPPORT_SSE41=--enable-sse41" >> $GITHUB_ENV
- fi
-
- if [[ -n $(cat /proc/cpuinfo | grep flags | grep " avx " | grep " avx2 ") ]]; then
- echo "CPU_SUPPORT_AVX2=--enable-avx2" >> $GITHUB_ENV
- fi
-
- if [[ -n $(cat /proc/cpuinfo | grep flags | grep " avx512bw ") ]]; then
- echo "CPU_SUPPORT_AVX512=--enable-avx512" >> $GITHUB_ENV
- fi
-
- if [[ -n $(cat /proc/cpuinfo | grep flags | grep " sha_ni ") ]]; then
- echo "CPU_SUPPORT_SHANI=--enable-shani" >> $GITHUB_ENV
- fi
-
- if [[ ${{ matrix.detectcpuflags }} == 'detect' ]]; then
- echo "CPU_SUPPORTED_FLAGS='$CPU_SUPPORT_SSE41 $CPU_SUPPORT_AVX2 $CPU_SUPPORT_AVX512 $CPU_SUPPORT_SHANI'" >> $GITHUB_ENV
- fi
-
- name: Denormalize parameterization
shell: bash
run: |
@@ -203,7 +174,6 @@ jobs:
--prefix=${{ env.LIBBITCOIN_SRC_PATH }}prefix
${{ env.LINKAGE }}
${{ env.ASSERT_NDEBUG }}
- ${{ env.CPU_SUPPORTED_FLAGS }}
${{ matrix.boost }}
${{ matrix.icu }}
${{ matrix.secp256k1 }}
@@ -285,13 +255,12 @@ jobs:
optimization: "debug"
assert: "debug"
coverage: "nocov"
- detectcpuflags: "ignore"
boost: "--build-boost"
icu: ""
secp256k1: "--build-secp256k1"
cc: "clang-15"
flags: "-Og -fPIE"
- options: "-Denable-avx2=on -Denable-sse41=on"
+ options: "-Denable-avx2=on -Denable-sse4=on"
packager: "apt"
packages: ""
@@ -301,7 +270,6 @@ jobs:
optimization: "size"
assert: "ndebug"
coverage: "nocov"
- detectcpuflags: "ignore"
boost: "--build-boost"
icu: "--build-icu --with-icu"
secp256k1: "--build-secp256k1"
@@ -317,13 +285,12 @@ jobs:
optimization: "size"
assert: "ndebug"
coverage: "nocov"
- detectcpuflags: "ignore"
boost: "--build-boost"
icu: ""
secp256k1: "--build-secp256k1"
cc: "gcc-11"
flags: "-Os -fPIE"
- options: "-Denable-sse41=on"
+ options: "-Denable-sse4=on"
packager: "apt"
packages: ""
@@ -333,7 +300,6 @@ jobs:
optimization: "size"
assert: "ndebug"
coverage: "nocov"
- detectcpuflags: "ignore"
boost: "--build-boost"
icu: "--build-icu --with-icu"
secp256k1: "--build-secp256k1"
@@ -349,7 +315,6 @@ jobs:
optimization: "size"
assert: "ndebug"
coverage: "nocov"
- detectcpuflags: "ignore"
boost: "--build-boost"
icu: ""
secp256k1: "--build-secp256k1"
@@ -365,7 +330,6 @@ jobs:
optimization: "size"
assert: "ndebug"
coverage: "nocov"
- detectcpuflags: "ignore"
boost: "--build-boost"
icu: ""
secp256k1: "--build-secp256k1"
@@ -404,29 +368,6 @@ jobs:
run: |
brew install autoconf automake libtool ${{ matrix.packages }}
- - name: Determine CPU flags
- shell: bash
- run: |
- if [[ -n $(cat /proc/cpuinfo | grep flags | grep " sse4_1 ") ]]; then
- echo "CPU_SUPPORT_SSE41=-Denable-sse41=on" >> $GITHUB_ENV
- fi
-
- if [[ -n $(cat /proc/cpuinfo | grep flags | grep " avx " | grep " avx2 ") ]]; then
- echo "CPU_SUPPORT_AVX2=-Denable-avx2=on" >> $GITHUB_ENV
- fi
-
- if [[ -n $(cat /proc/cpuinfo | grep flags | grep " avx512bw ") ]]; then
- echo "CPU_SUPPORT_AVX512=-Denable-avx512=on" >> $GITHUB_ENV
- fi
-
- if [[ -n $(cat /proc/cpuinfo | grep flags | grep " sha_ni ") ]]; then
- echo "CPU_SUPPORT_SHANI=-Denable-shani=on" >> $GITHUB_ENV
- fi
-
- if [[ ${{ matrix.detectcpuflags }} == 'detect' ]]; then
- echo "CPU_SUPPORTED_FLAGS='$CPU_SUPPORT_SSE41 $CPU_SUPPORT_AVX2 $CPU_SUPPORT_AVX512 $CPU_SUPPORT_SHANI'" >> $GITHUB_ENV
- fi
-
- name: Denormalize parameterization
shell: bash
run: |
@@ -468,7 +409,6 @@ jobs:
--prefix=${{ env.LIBBITCOIN_SRC_PATH }}prefix
${{ env.LINKAGE }}
${{ env.ASSERT_NDEBUG }}
- ${{ env.CPU_SUPPORTED_FLAGS }}
${{ matrix.boost }}
${{ matrix.icu }}
${{ matrix.secp256k1 }}
@@ -561,13 +501,12 @@ jobs:
optimization: "debug"
assert: "debug"
coverage: "nocov"
- detectcpuflags: "ignore"
boost: "--build-boost"
icu: ""
secp256k1: "--build-secp256k1"
cc: "clang-15"
flags: "-Og -fPIE"
- options: "-Denable-avx2=on -Denable-sse41=on"
+ options: "-Denable-avx2=on -Denable-sse4=on"
packager: "apt"
packages: ""
@@ -578,7 +517,6 @@ jobs:
optimization: "size"
assert: "ndebug"
coverage: "nocov"
- detectcpuflags: "ignore"
boost: "--build-boost"
icu: "--build-icu --with-icu"
secp256k1: "--build-secp256k1"
@@ -595,13 +533,12 @@ jobs:
optimization: "size"
assert: "ndebug"
coverage: "nocov"
- detectcpuflags: "ignore"
boost: "--build-boost"
icu: ""
secp256k1: "--build-secp256k1"
cc: "gcc-11"
flags: "-Os -fPIE"
- options: "-Denable-sse41=on"
+ options: "-Denable-sse4=on"
packager: "apt"
packages: ""
@@ -634,29 +571,6 @@ jobs:
run: |
brew install autoconf automake libtool ${{ matrix.packages }}
- - name: Determine CPU flags
- shell: bash
- run: |
- if [[ -n $(cat /proc/cpuinfo | grep flags | grep " sse4_1 ") ]]; then
- echo "CPU_SUPPORT_SSE41=-Denable-sse41=on" >> $GITHUB_ENV
- fi
-
- if [[ -n $(cat /proc/cpuinfo | grep flags | grep " avx " | grep " avx2 ") ]]; then
- echo "CPU_SUPPORT_AVX2=-Denable-avx2=on" >> $GITHUB_ENV
- fi
-
- if [[ -n $(cat /proc/cpuinfo | grep flags | grep " avx512bw ") ]]; then
- echo "CPU_SUPPORT_AVX512=-Denable-avx512=on" >> $GITHUB_ENV
- fi
-
- if [[ -n $(cat /proc/cpuinfo | grep flags | grep " sha_ni ") ]]; then
- echo "CPU_SUPPORT_SHANI=-Denable-shani=on" >> $GITHUB_ENV
- fi
-
- if [[ ${{ matrix.detectcpuflags }} == 'detect' ]]; then
- echo "CPU_SUPPORTED_FLAGS='$CPU_SUPPORT_SSE41 $CPU_SUPPORT_AVX2 $CPU_SUPPORT_AVX512 $CPU_SUPPORT_SHANI'" >> $GITHUB_ENV
- fi
-
- name: Denormalize parameterization
shell: bash
run: |
@@ -699,7 +613,6 @@ jobs:
--preset=${{ matrix.preset }}
${{ env.LINKAGE }}
${{ env.ASSERT_NDEBUG }}
- ${{ env.CPU_SUPPORTED_FLAGS }}
${{ matrix.boost }}
${{ matrix.icu }}
${{ matrix.secp256k1 }}
diff --git a/Makefile.am b/Makefile.am
index 9a6fa4519e..bdcc4e8dc8 100755
--- a/Makefile.am
+++ b/Makefile.am
@@ -425,6 +425,7 @@ include_bitcoin_system_HEADERS = \
include/bitcoin/system/constraints.hpp \
include/bitcoin/system/define.hpp \
include/bitcoin/system/exceptions.hpp \
+ include/bitcoin/system/execution.hpp \
include/bitcoin/system/forks.hpp \
include/bitcoin/system/funclets.hpp \
include/bitcoin/system/have.hpp \
diff --git a/builds/msvc/vs2022/libbitcoin-system/libbitcoin-system.vcxproj b/builds/msvc/vs2022/libbitcoin-system/libbitcoin-system.vcxproj
index 13504ad87a..b3f7b885e5 100644
--- a/builds/msvc/vs2022/libbitcoin-system/libbitcoin-system.vcxproj
+++ b/builds/msvc/vs2022/libbitcoin-system/libbitcoin-system.vcxproj
@@ -331,6 +331,7 @@
+
diff --git a/builds/msvc/vs2022/libbitcoin-system/libbitcoin-system.vcxproj.filters b/builds/msvc/vs2022/libbitcoin-system/libbitcoin-system.vcxproj.filters
index c4080e871a..837e842b9c 100644
--- a/builds/msvc/vs2022/libbitcoin-system/libbitcoin-system.vcxproj.filters
+++ b/builds/msvc/vs2022/libbitcoin-system/libbitcoin-system.vcxproj.filters
@@ -869,6 +869,9 @@
include\bitcoin\system
+
+ include\bitcoin\system
+
include\bitcoin\system
diff --git a/include/bitcoin/system.hpp b/include/bitcoin/system.hpp
index 5d20e1783e..a269944bed 100755
--- a/include/bitcoin/system.hpp
+++ b/include/bitcoin/system.hpp
@@ -21,6 +21,7 @@
#include
#include
#include
+#include
#include
#include
#include
diff --git a/include/bitcoin/system/execution.hpp b/include/bitcoin/system/execution.hpp
new file mode 100644
index 0000000000..c3d3c5c3e6
--- /dev/null
+++ b/include/bitcoin/system/execution.hpp
@@ -0,0 +1,2819 @@
+/**
+ * Thread pool-based implementation of parallel standard library algorithms.
+ * github.com/alugowski/poolSTL
+ *
+ * MIT License:
+ *
+ * Copyright (c) 2023 Adam Lugowski
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+
+#ifndef POOLSTL_HPP
+#define POOLSTL_HPP
+
+
+#ifndef POOLSTL_EXECUTION_HPP
+#define POOLSTL_EXECUTION_HPP
+
+#include
+#include
+#include
+#include
+
+
+#ifndef AL_TASK_THREAD_POOL_HPP
+#define AL_TASK_THREAD_POOL_HPP
+
+// Version macros.
+#define TASK_THREAD_POOL_VERSION_MAJOR 1
+#define TASK_THREAD_POOL_VERSION_MINOR 0
+#define TASK_THREAD_POOL_VERSION_PATCH 10
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+// MSVC does not correctly set the __cplusplus macro by default, so we must read it from _MSVC_LANG
+// See https://devblogs.microsoft.com/cppblog/msvc-now-correctly-reports-__cplusplus/
+#if __cplusplus >= 201703L || (defined(_MSVC_LANG) && _MSVC_LANG >= 201703L)
+#define TTP_CXX17 1
+#else
+#define TTP_CXX17 0
+#endif
+
+#if TTP_CXX17
+#define TTP_NODISCARD [[nodiscard]]
+#else
+#define TTP_NODISCARD
+#endif
+
+namespace task_thread_pool {
+
+#if !TTP_CXX17
+ /**
+ * A reimplementation of std::decay_t, which is only available since C++14.
+ */
+ template
+ using decay_t = typename std::decay::type;
+#endif
+
+ /**
+ * A fast and lightweight thread pool that uses C++11 threads.
+ */
+ class task_thread_pool {
+ public:
+ /**
+ * Create a task_thread_pool and start worker threads.
+ *
+ * @param num_threads Number of worker threads. If 0 then number of threads is equal to the
+ * number of physical cores on the machine, as given by std::thread::hardware_concurrency().
+ */
+ explicit task_thread_pool(unsigned int num_threads = 0) {
+ if (num_threads < 1) {
+ num_threads = std::thread::hardware_concurrency();
+ if (num_threads < 1) { num_threads = 1; }
+ }
+ start_threads(num_threads);
+ }
+
+ /**
+ * Finish all tasks left in the queue then shut down worker threads.
+ * If the pool is currently paused then it is resumed.
+ */
+ ~task_thread_pool() {
+ unpause();
+ wait_for_queued_tasks();
+ stop_all_threads();
+ }
+
+ /**
+ * Drop all tasks that have been submitted but not yet started by a worker.
+ *
+ * Tasks already in progress continue executing.
+ */
+ void clear_task_queue() {
+ const std::lock_guard tasks_lock(task_mutex);
+ tasks = {};
+ }
+
+ /**
+ * Get number of enqueued tasks.
+ *
+ * @return Number of tasks that have been enqueued but not yet started.
+ */
+ TTP_NODISCARD size_t get_num_queued_tasks() const {
+ const std::lock_guard tasks_lock(task_mutex);
+ return tasks.size();
+ }
+
+ /**
+ * Get number of in-progress tasks.
+ *
+ * @return Approximate number of tasks currently being processed by worker threads.
+ */
+ TTP_NODISCARD size_t get_num_running_tasks() const {
+ const std::lock_guard tasks_lock(task_mutex);
+ return num_inflight_tasks;
+ }
+
+ /**
+ * Get total number of tasks in the pool.
+ *
+ * @return Approximate number of tasks both enqueued and running.
+ */
+ TTP_NODISCARD size_t get_num_tasks() const {
+ const std::lock_guard tasks_lock(task_mutex);
+ return tasks.size() + num_inflight_tasks;
+ }
+
+ /**
+ * Get number of worker threads.
+ *
+ * @return Number of worker threads.
+ */
+ TTP_NODISCARD unsigned int get_num_threads() const {
+ const std::lock_guard threads_lock(thread_mutex);
+ return static_cast(threads.size());
+ }
+
+ /**
+ * Set number of worker threads. Will start or stop worker threads as necessary.
+ *
+ * @param num_threads Number of worker threads. If 0 then number of threads is equal to the
+ * number of physical cores on the machine, as given by std::thread::hardware_concurrency().
+ * @return Previous number of worker threads.
+ */
+ unsigned int set_num_threads(unsigned int num_threads) {
+ const std::lock_guard threads_lock(thread_mutex);
+ unsigned int previous_num_threads = get_num_threads();
+
+ if (num_threads < 1) {
+ num_threads = std::thread::hardware_concurrency();
+ if (num_threads < 1) { num_threads = 1; }
+ }
+
+ if (previous_num_threads <= num_threads) {
+ // expanding the thread pool
+ start_threads(num_threads - previous_num_threads);
+ } else {
+ // contracting the thread pool
+ stop_all_threads();
+ {
+ const std::lock_guard tasks_lock(task_mutex);
+ pool_running = true;
+ }
+ start_threads(num_threads);
+ }
+
+ return previous_num_threads;
+ }
+
+ /**
+ * Stop executing queued tasks. Use `unpause()` to resume. Note: Destroying the pool will implicitly unpause.
+ *
+ * Any in-progress tasks continue executing.
+ */
+ void pause() {
+ const std::lock_guard tasks_lock(task_mutex);
+ pool_paused = true;
+ }
+
+ /**
+ * Resume executing queued tasks.
+ */
+ void unpause() {
+ const std::lock_guard tasks_lock(task_mutex);
+ pool_paused = false;
+ task_cv.notify_all();
+ }
+
+ /**
+ * Check whether the pool is paused.
+ *
+ * @return true if pause() has been called without an intervening unpause().
+ */
+ TTP_NODISCARD bool is_paused() const {
+ const std::lock_guard tasks_lock(task_mutex);
+ return pool_paused;
+ }
+
+ /**
+ * Submit a Callable for the pool to execute and return a std::future.
+ *
+ * @param func The Callable to execute. Can be a function, a lambda, std::packaged_task, std::function, etc.
+ * @param args Arguments for func. Optional.
+ * @return std::future that can be used to get func's return value or thrown exception.
+ */
+ template , std::decay_t...>
+#else
+ typename R = typename std::result_of(decay_t...)>::type
+#endif
+ >
+ TTP_NODISCARD std::future submit(F&& func, A&&... args) {
+#if defined(_MSC_VER)
+ // MSVC's packaged_task is not movable even though it should be.
+ // Discussion about this bug and its future fix:
+ // https://developercommunity.visualstudio.com/t/unable-to-move-stdpackaged-task-into-any-stl-conta/108672
+ std::shared_ptr> ptask =
+ std::make_shared>(std::bind(std::forward(func), std::forward(args)...));
+ submit_detach([ptask] { (*ptask)(); });
+ return ptask->get_future();
+#else
+ std::packaged_task task(std::bind(std::forward(func), std::forward(args)...));
+ auto ret = task.get_future();
+ submit_detach(std::move(task));
+ return ret;
+#endif
+ }
+
+ /**
+ * Submit a zero-argument Callable for the pool to execute.
+ *
+ * @param func The Callable to execute. Can be a function, a lambda, std::packaged_task, std::function, etc.
+ */
+ template
+ void submit_detach(F&& func) {
+ const std::lock_guard tasks_lock(task_mutex);
+ tasks.emplace(std::forward(func));
+ task_cv.notify_one();
+ }
+
+ /**
+ * Submit a Callable with arguments for the pool to execute.
+ *
+ * @param func The Callable to execute. Can be a function, a lambda, std::packaged_task, std::function, etc.
+ */
+ template
+ void submit_detach(F&& func, A&&... args) {
+ const std::lock_guard tasks_lock(task_mutex);
+ tasks.emplace(std::bind(std::forward(func), std::forward(args)...));
+ task_cv.notify_one();
+ }
+
+ /**
+ * Block until the task queue is empty. Some tasks may be in-progress when this method returns.
+ */
+ void wait_for_queued_tasks() {
+ std::unique_lock tasks_lock(task_mutex);
+ notify_task_finish = true;
+ task_finished_cv.wait(tasks_lock, [&] { return tasks.empty(); });
+ notify_task_finish = false;
+ }
+
+ /**
+ * Block until all tasks have finished.
+ */
+ void wait_for_tasks() {
+ std::unique_lock tasks_lock(task_mutex);
+ notify_task_finish = true;
+ task_finished_cv.wait(tasks_lock, [&] { return tasks.empty() && num_inflight_tasks == 0; });
+ notify_task_finish = false;
+ }
+
+ protected:
+
+ /**
+ * Main function for worker threads.
+ */
+ void worker_main() {
+ bool finished_task = false;
+
+ while (true) {
+ std::unique_lock tasks_lock(task_mutex);
+
+ if (finished_task) {
+ --num_inflight_tasks;
+ if (notify_task_finish) {
+ task_finished_cv.notify_all();
+ }
+ }
+
+ task_cv.wait(tasks_lock, [&]() { return !pool_running || (!pool_paused && !tasks.empty()); });
+
+ if (!pool_running) {
+ break;
+ }
+
+ // Must mean that (!pool_paused && !tasks.empty()) is true
+
+ std::packaged_task task{std::move(tasks.front())};
+ tasks.pop();
+ ++num_inflight_tasks;
+ tasks_lock.unlock();
+
+ try {
+ task();
+ } catch (...) {
+ // std::packaged_task::operator() may throw in some error conditions, such as if the task
+ // had already been run. Nothing that the pool can do anything about.
+ }
+
+ finished_task = true;
+ }
+ }
+
+ /**
+ * Start worker threads.
+ *
+ * @param num_threads How many threads to start.
+ */
+ void start_threads(const unsigned int num_threads) {
+ const std::lock_guard threads_lock(thread_mutex);
+
+ for (unsigned int i = 0; i < num_threads; ++i) {
+ threads.emplace_back(&task_thread_pool::worker_main, this);
+ }
+ }
+
+ /**
+ * Stop, join, and destroy all worker threads.
+ */
+ void stop_all_threads() {
+ const std::lock_guard threads_lock(thread_mutex);
+
+ {
+ const std::lock_guard tasks_lock(task_mutex);
+ pool_running = false;
+ task_cv.notify_all();
+ }
+
+ for (auto& thread : threads) {
+ if (thread.joinable()) {
+ thread.join();
+ }
+ }
+ threads.clear();
+ }
+
+ /**
+ * The worker threads.
+ *
+ * Access protected by thread_mutex
+ */
+ std::vector threads;
+
+ /**
+ * A mutex for methods that start/stop threads.
+ */
+ mutable std::recursive_mutex thread_mutex;
+
+ /**
+ * The task queue.
+ *
+ * Access protected by task_mutex.
+ */
+ std::queue> tasks = {};
+
+ /**
+ * A mutex for all variables related to tasks.
+ */
+ mutable std::mutex task_mutex;
+
+ /**
+ * Used to notify changes to the task queue, such as a new task added, pause/unpause, etc.
+ */
+ std::condition_variable task_cv;
+
+ /**
+ * Used to notify of finished tasks.
+ */
+ std::condition_variable task_finished_cv;
+
+ /**
+ * A signal for worker threads that the pool is either running or shutting down.
+ *
+ * Access protected by task_mutex.
+ */
+ bool pool_running = true;
+
+ /**
+ * A signal for worker threads to not pull new tasks from the queue.
+ *
+ * Access protected by task_mutex.
+ */
+ bool pool_paused = false;
+
+ /**
+ * A signal for worker threads that they should notify task_finished_cv when they finish a task.
+ *
+ * Access protected by task_mutex.
+ */
+ bool notify_task_finish = false;
+
+ /**
+ * A counter of the number of tasks in-progress by worker threads.
+ * Incremented when a task is popped off the task queue and decremented when that task is complete.
+ *
+ * Access protected by task_mutex.
+ */
+ int num_inflight_tasks = 0;
+ };
+}
+
+// clean up
+#undef TTP_NODISCARD
+#undef TTP_CXX17
+
+#endif
+
+#ifndef POOLSTL_INTERNAL_UTILS_HPP
+#define POOLSTL_INTERNAL_UTILS_HPP
+
+// Version macros.
+#define POOLSTL_VERSION_MAJOR 0
+#define POOLSTL_VERSION_MINOR 3
+#define POOLSTL_VERSION_PATCH 5
+
+#include
+#include
+
+#if __cplusplus >= 201703L || (defined(_MSVC_LANG) && _MSVC_LANG >= 201703L)
+#define POOLSTL_HAVE_CXX17 1
+#define POOLSTL_NO_DISCARD [[nodiscard]]
+#else
+#define POOLSTL_HAVE_CXX17 0
+#define POOLSTL_NO_DISCARD
+#endif
+
+#if POOLSTL_HAVE_CXX17 && (!defined(_GLIBCXX_RELEASE) || _GLIBCXX_RELEASE >= 9)
+#define POOLSTL_HAVE_CXX17_LIB 1
+#else
+#define POOLSTL_HAVE_CXX17_LIB 0
+#endif
+
+#if __cplusplus >= 201402L || (defined(_MSVC_LANG) && _MSVC_LANG >= 201402L)
+#define POOLSTL_HAVE_CXX14 1
+#else
+#define POOLSTL_HAVE_CXX14 0
+#endif
+
+namespace poolstl {
+ namespace internal {
+
+ inline constexpr std::size_t get_chunk_size(std::size_t num_steps, unsigned int num_threads) {
+ return (num_steps / num_threads) + ((num_steps % num_threads) > 0 ? 1 : 0);
+ }
+
+ template
+ constexpr typename std::iterator_traits::difference_type
+ get_chunk_size(Iterator first, Iterator last, unsigned int num_threads) {
+ using diff_t = typename std::iterator_traits::difference_type;
+ return static_cast(get_chunk_size((std::size_t)std::distance(first, last), num_threads));
+ }
+
+ template
+ constexpr typename std::iterator_traits::difference_type
+ get_iter_chunk_size(const Iterator& iter, const Iterator& last,
+ typename std::iterator_traits::difference_type chunk_size) {
+ return std::min(chunk_size, std::distance(iter, last));
+ }
+
+ template
+ Iterator advanced(Iterator iter, typename std::iterator_traits::difference_type offset) {
+ Iterator ret = iter;
+ std::advance(ret, offset);
+ return ret;
+ }
+
+ /**
+ * An iterator wrapper that calls std::future<>::get().
+ * @tparam Iterator
+ */
+ template
+ class getting_iter : public Iterator {
+ public:
+ using value_type = decltype((*std::declval()).get());
+ using difference_type = typename std::iterator_traits::difference_type;
+ using pointer = value_type*;
+ using reference = value_type&;
+ explicit getting_iter(Iterator iter) : iter(iter) {}
+
+ getting_iter operator++() { ++iter; return *this; }
+ getting_iter operator++(int) { getting_iter ret(*this); ++iter; return ret; }
+
+ value_type operator*() { return (*iter).get(); }
+ value_type operator[](difference_type offset) { return iter[offset].get(); }
+
+ bool operator==(const getting_iter &other) const { return iter == other.iter; }
+ bool operator!=(const getting_iter &other) const { return iter != other.iter; }
+
+ protected:
+ Iterator iter;
+ };
+
+ template
+ getting_iter get_wrap(Iterator iter) {
+ return getting_iter(iter);
+ }
+
+ template
+ void get_futures(Container& futures) {
+ for (auto &future: futures) {
+ future.get();
+ }
+ }
+
+ /**
+ * Identify a pivot element for quicksort. Chooses the middle element of the range.
+ */
+ template
+ typename std::iterator_traits::value_type quicksort_pivot(Iterator first, Iterator last) {
+ return *(std::next(first, std::distance(first, last) / 2));
+ }
+
+ /**
+ * Predicate for std::partition (for quicksort)
+ */
+ template
+ struct pivot_predicate {
+ pivot_predicate(Compare comp, const T& pivot) : comp(comp), pivot(pivot) {}
+
+ bool operator()(const T& em) {
+ return comp(em, pivot);
+ }
+ Compare comp;
+ const T pivot;
+ };
+
+ /*
+ * Some methods are only available with C++17 and up. Reimplement on older standards.
+ */
+#if POOLSTL_HAVE_CXX17_LIB
+ namespace cpp17 = std;
+#else
+ namespace cpp17 {
+
+ // std::reduce
+
+ template
+ Tp reduce(InputIt first, InputIt last, Tp init, BinOp b) {
+ for (; first != last; ++first)
+ init = b(init, *first);
+ return init;
+ }
+
+ template
+ typename std::iterator_traits::value_type reduce(InputIt first, InputIt last) {
+ return reduce(first, last,
+ typename std::iterator_traits::value_type{},
+ std::plus::value_type>());
+ }
+
+ // std::transform
+
+ template
+ OutputIt transform(InputIt first1, InputIt last1, OutputIt d_first,
+ UnaryOperation unary_op) {
+ while (first1 != last1) {
+ *d_first++ = unary_op(*first1++);
+ }
+
+ return d_first;
+ }
+
+ template
+ OutputIt transform(InputIt1 first1, InputIt1 last1,
+ InputIt2 first2, OutputIt d_first,
+ BinaryOperation binary_op) {
+ while (first1 != last1) {
+ *d_first++ = binary_op(*first1++, *first2++);
+ }
+
+ return d_first;
+ }
+ }
+#endif
+ }
+}
+
+#endif
+
+namespace poolstl {
+
+ namespace ttp = task_thread_pool;
+
+ namespace execution {
+ namespace internal {
+ /**
+ * Holds the thread pool used by par.
+ */
+ inline std::shared_ptr get_default_pool() {
+ static std::shared_ptr pool;
+ static std::once_flag flag;
+ std::call_once(flag, [&](){ pool = std::make_shared(); });
+ return pool;
+ }
+ }
+
+ /**
+ * Base class for all poolSTL policies.
+ */
+ struct poolstl_policy {
+ };
+
+ /**
+ * A sequential policy that simply forwards to the non-policy overload.
+ */
+ struct sequenced_policy : public poolstl_policy {
+ POOLSTL_NO_DISCARD ttp::task_thread_pool* pool() const {
+ // never called, but must exist for C++11 support
+ throw std::runtime_error("poolSTL: requested thread pool for seq policy.");
+ }
+
+ POOLSTL_NO_DISCARD bool par_allowed() const {
+ return false;
+ }
+ };
+
+ /**
+ * A parallel policy that can use a user-specified thread pool or a default one.
+ */
+ struct parallel_policy : public poolstl_policy {
+ parallel_policy() = default;
+ explicit parallel_policy(ttp::task_thread_pool* on_pool, bool par_ok): on_pool(on_pool), par_ok(par_ok) {}
+
+ parallel_policy on(ttp::task_thread_pool& pool) const {
+ return parallel_policy{&pool, par_ok};
+ }
+
+ parallel_policy par_if(bool call_par) const {
+ return parallel_policy{on_pool, call_par};
+ }
+
+ POOLSTL_NO_DISCARD ttp::task_thread_pool* pool() const {
+ if (on_pool) {
+ return on_pool;
+ } else {
+ return internal::get_default_pool().get();
+ }
+ }
+
+ POOLSTL_NO_DISCARD bool par_allowed() const {
+ return par_ok;
+ }
+
+ protected:
+ ttp::task_thread_pool *on_pool = nullptr;
+ bool par_ok = true;
+ };
+
+ constexpr sequenced_policy seq{};
+ constexpr parallel_policy par{};
+
+ /**
+ * EXPERIMENTAL: Subject to significant changes or removal.
+ * Use pure threads for each operation instead of a shared thread pool.
+ *
+ * Advantage:
+ * - Fewer symbols (no packaged_task with its operators, destructors, vtable, etc) means smaller binary
+ * which can mean a lot when there are many calls.
+ * - No thread pool to manage.
+ *
+ * Disadvantages:
+ * - Threads are started and joined for every operation, so it is harder to amortize that cost.
+ * - Barely any algorithms are supported.
+ */
+ struct pure_threads_policy : public poolstl_policy {
+ explicit pure_threads_policy(unsigned int num_threads, bool par_ok): num_threads(num_threads),
+ par_ok(par_ok) {}
+
+ POOLSTL_NO_DISCARD unsigned int get_num_threads() const {
+ if (num_threads == 0) {
+ return std::thread::hardware_concurrency();
+ }
+ return num_threads;
+ }
+
+ POOLSTL_NO_DISCARD bool par_allowed() const {
+ return par_ok;
+ }
+
+ protected:
+ unsigned int num_threads = 1;
+ bool par_ok = true;
+ };
+
+ /**
+ * Choose parallel or sequential at runtime.
+ *
+ * @param call_par Whether to use a parallel policy.
+ * @return `par` if call_par is true, else a sequential policy (like `seq`).
+ */
+ inline parallel_policy par_if(bool call_par) {
+ return parallel_policy{nullptr, call_par};
+ }
+
+ /**
+ * Choose parallel or sequential at runtime, with pool selection.
+ *
+ * @param call_par Whether to use a parallel policy.
+ * @return `par.on(pool)` if call_par is true, else a sequential policy (like `seq`).
+ */
+ inline parallel_policy par_if(bool call_par, ttp::task_thread_pool& pool) {
+ return parallel_policy{&pool, call_par};
+ }
+
+ /**
+ * EXPERIMENTAL: Subject to significant changes or removal. See `pure_threads_policy`.
+ * Choose parallel or sequential at runtime, with thread count selection.
+ *
+ * @param call_par Whether to use a parallel policy.
+ * @return `par.on(pool)` if call_par is true, else `seq`.
+ */
+ inline pure_threads_policy par_if_threads(bool call_par, unsigned int num_threads) {
+ return pure_threads_policy{num_threads, call_par};
+ }
+ }
+
+ using execution::seq;
+ using execution::par;
+ using execution::par_if;
+
+ namespace internal {
+ /**
+ * To enable/disable seq overload resolution
+ */
+ template
+ using enable_if_seq =
+ typename std::enable_if<
+ std::is_same::type>::type>::value,
+ Tp>::type;
+
+ /**
+ * To enable/disable par overload resolution
+ */
+ template
+ using enable_if_par =
+ typename std::enable_if<
+ std::is_same::type>::type>::value,
+ Tp>::type;
+
+ /**
+ * To enable/disable par overload resolution
+ */
+ template
+ using enable_if_poolstl_policy =
+ typename std::enable_if<
+ std::is_base_of::type>::type>::value,
+ Tp>::type;
+
+ template
+ bool is_seq(const ExecPolicy& policy) {
+ return !policy.par_allowed();
+ }
+
+ template
+ using is_pure_threads_policy = std::is_same::type>::type>;
+ }
+}
+
+#endif
+
+#ifndef POOLSTL_ALGORITHM_HPP
+#define POOLSTL_ALGORITHM_HPP
+
+#include
+
+
+#ifndef POOLSTL_INTERNAL_TTP_IMPL_HPP
+#define POOLSTL_INTERNAL_TTP_IMPL_HPP
+
+#include
+#include
+#include
+#include
+
+
+#ifndef POOLSTL_EXECUTION_HPP
+#define POOLSTL_EXECUTION_HPP
+
+#include
+#include
+#include
+#include
+
+
+#ifndef AL_TASK_THREAD_POOL_HPP
+#define AL_TASK_THREAD_POOL_HPP
+
+// Version macros.
+#define TASK_THREAD_POOL_VERSION_MAJOR 1
+#define TASK_THREAD_POOL_VERSION_MINOR 0
+#define TASK_THREAD_POOL_VERSION_PATCH 10
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+// MSVC does not correctly set the __cplusplus macro by default, so we must read it from _MSVC_LANG
+// See https://devblogs.microsoft.com/cppblog/msvc-now-correctly-reports-__cplusplus/
+#if __cplusplus >= 201703L || (defined(_MSVC_LANG) && _MSVC_LANG >= 201703L)
+#define TTP_CXX17 1
+#else
+#define TTP_CXX17 0
+#endif
+
+#if TTP_CXX17
+#define TTP_NODISCARD [[nodiscard]]
+#else
+#define TTP_NODISCARD
+#endif
+
+namespace task_thread_pool {
+
+#if !TTP_CXX17
+ /**
+ * A reimplementation of std::decay_t, which is only available since C++14.
+ */
+ template
+ using decay_t = typename std::decay::type;
+#endif
+
+ /**
+ * A fast and lightweight thread pool that uses C++11 threads.
+ */
+ class task_thread_pool {
+ public:
+ /**
+ * Create a task_thread_pool and start worker threads.
+ *
+ * @param num_threads Number of worker threads. If 0 then number of threads is equal to the
+ * number of physical cores on the machine, as given by std::thread::hardware_concurrency().
+ */
+ explicit task_thread_pool(unsigned int num_threads = 0) {
+ if (num_threads < 1) {
+ num_threads = std::thread::hardware_concurrency();
+ if (num_threads < 1) { num_threads = 1; }
+ }
+ start_threads(num_threads);
+ }
+
+ /**
+ * Finish all tasks left in the queue then shut down worker threads.
+ * If the pool is currently paused then it is resumed.
+ */
+ ~task_thread_pool() {
+ unpause();
+ wait_for_queued_tasks();
+ stop_all_threads();
+ }
+
+ /**
+ * Drop all tasks that have been submitted but not yet started by a worker.
+ *
+ * Tasks already in progress continue executing.
+ */
+ void clear_task_queue() {
+ const std::lock_guard tasks_lock(task_mutex);
+ tasks = {};
+ }
+
+ /**
+ * Get number of enqueued tasks.
+ *
+ * @return Number of tasks that have been enqueued but not yet started.
+ */
+ TTP_NODISCARD size_t get_num_queued_tasks() const {
+ const std::lock_guard tasks_lock(task_mutex);
+ return tasks.size();
+ }
+
+ /**
+ * Get number of in-progress tasks.
+ *
+ * @return Approximate number of tasks currently being processed by worker threads.
+ */
+ TTP_NODISCARD size_t get_num_running_tasks() const {
+ const std::lock_guard tasks_lock(task_mutex);
+ return num_inflight_tasks;
+ }
+
+ /**
+ * Get total number of tasks in the pool.
+ *
+ * @return Approximate number of tasks both enqueued and running.
+ */
+ TTP_NODISCARD size_t get_num_tasks() const {
+ const std::lock_guard tasks_lock(task_mutex);
+ return tasks.size() + num_inflight_tasks;
+ }
+
+ /**
+ * Get number of worker threads.
+ *
+ * @return Number of worker threads.
+ */
+ TTP_NODISCARD unsigned int get_num_threads() const {
+ const std::lock_guard threads_lock(thread_mutex);
+ return static_cast(threads.size());
+ }
+
+ /**
+ * Set number of worker threads. Will start or stop worker threads as necessary.
+ *
+ * @param num_threads Number of worker threads. If 0 then number of threads is equal to the
+ * number of physical cores on the machine, as given by std::thread::hardware_concurrency().
+ * @return Previous number of worker threads.
+ */
+ unsigned int set_num_threads(unsigned int num_threads) {
+ const std::lock_guard threads_lock(thread_mutex);
+ unsigned int previous_num_threads = get_num_threads();
+
+ if (num_threads < 1) {
+ num_threads = std::thread::hardware_concurrency();
+ if (num_threads < 1) { num_threads = 1; }
+ }
+
+ if (previous_num_threads <= num_threads) {
+ // expanding the thread pool
+ start_threads(num_threads - previous_num_threads);
+ } else {
+ // contracting the thread pool
+ stop_all_threads();
+ {
+ const std::lock_guard tasks_lock(task_mutex);
+ pool_running = true;
+ }
+ start_threads(num_threads);
+ }
+
+ return previous_num_threads;
+ }
+
+ /**
+ * Stop executing queued tasks. Use `unpause()` to resume. Note: Destroying the pool will implicitly unpause.
+ *
+ * Any in-progress tasks continue executing.
+ */
+ void pause() {
+ const std::lock_guard tasks_lock(task_mutex);
+ pool_paused = true;
+ }
+
+ /**
+ * Resume executing queued tasks.
+ */
+ void unpause() {
+ const std::lock_guard tasks_lock(task_mutex);
+ pool_paused = false;
+ task_cv.notify_all();
+ }
+
+ /**
+ * Check whether the pool is paused.
+ *
+ * @return true if pause() has been called without an intervening unpause().
+ */
+ TTP_NODISCARD bool is_paused() const {
+ const std::lock_guard tasks_lock(task_mutex);
+ return pool_paused;
+ }
+
+ /**
+ * Submit a Callable for the pool to execute and return a std::future.
+ *
+ * @param func The Callable to execute. Can be a function, a lambda, std::packaged_task, std::function, etc.
+ * @param args Arguments for func. Optional.
+ * @return std::future that can be used to get func's return value or thrown exception.
+ */
+ template , std::decay_t...>
+#else
+ typename R = typename std::result_of(decay_t...)>::type
+#endif
+ >
+ TTP_NODISCARD std::future submit(F&& func, A&&... args) {
+#if defined(_MSC_VER)
+ // MSVC's packaged_task is not movable even though it should be.
+ // Discussion about this bug and its future fix:
+ // https://developercommunity.visualstudio.com/t/unable-to-move-stdpackaged-task-into-any-stl-conta/108672
+ std::shared_ptr> ptask =
+ std::make_shared>(std::bind(std::forward(func), std::forward(args)...));
+ submit_detach([ptask] { (*ptask)(); });
+ return ptask->get_future();
+#else
+ std::packaged_task task(std::bind(std::forward(func), std::forward(args)...));
+ auto ret = task.get_future();
+ submit_detach(std::move(task));
+ return ret;
+#endif
+ }
+
+ /**
+ * Submit a zero-argument Callable for the pool to execute.
+ *
+ * @param func The Callable to execute. Can be a function, a lambda, std::packaged_task, std::function, etc.
+ */
+ template
+ void submit_detach(F&& func) {
+ const std::lock_guard tasks_lock(task_mutex);
+ tasks.emplace(std::forward(func));
+ task_cv.notify_one();
+ }
+
+ /**
+ * Submit a Callable with arguments for the pool to execute.
+ *
+ * @param func The Callable to execute. Can be a function, a lambda, std::packaged_task, std::function, etc.
+ */
+ template
+ void submit_detach(F&& func, A&&... args) {
+ const std::lock_guard tasks_lock(task_mutex);
+ tasks.emplace(std::bind(std::forward(func), std::forward(args)...));
+ task_cv.notify_one();
+ }
+
+ /**
+ * Block until the task queue is empty. Some tasks may be in-progress when this method returns.
+ */
+ void wait_for_queued_tasks() {
+ std::unique_lock tasks_lock(task_mutex);
+ notify_task_finish = true;
+ task_finished_cv.wait(tasks_lock, [&] { return tasks.empty(); });
+ notify_task_finish = false;
+ }
+
+ /**
+ * Block until all tasks have finished.
+ */
+ void wait_for_tasks() {
+ std::unique_lock tasks_lock(task_mutex);
+ notify_task_finish = true;
+ task_finished_cv.wait(tasks_lock, [&] { return tasks.empty() && num_inflight_tasks == 0; });
+ notify_task_finish = false;
+ }
+
+ protected:
+
+ /**
+ * Main function for worker threads.
+ */
+ void worker_main() {
+ bool finished_task = false;
+
+ while (true) {
+ std::unique_lock tasks_lock(task_mutex);
+
+ if (finished_task) {
+ --num_inflight_tasks;
+ if (notify_task_finish) {
+ task_finished_cv.notify_all();
+ }
+ }
+
+ task_cv.wait(tasks_lock, [&]() { return !pool_running || (!pool_paused && !tasks.empty()); });
+
+ if (!pool_running) {
+ break;
+ }
+
+ // Must mean that (!pool_paused && !tasks.empty()) is true
+
+ std::packaged_task task{std::move(tasks.front())};
+ tasks.pop();
+ ++num_inflight_tasks;
+ tasks_lock.unlock();
+
+ try {
+ task();
+ } catch (...) {
+ // std::packaged_task::operator() may throw in some error conditions, such as if the task
+ // had already been run. Nothing that the pool can do anything about.
+ }
+
+ finished_task = true;
+ }
+ }
+
+ /**
+ * Start worker threads.
+ *
+ * @param num_threads How many threads to start.
+ */
+ void start_threads(const unsigned int num_threads) {
+ const std::lock_guard threads_lock(thread_mutex);
+
+ for (unsigned int i = 0; i < num_threads; ++i) {
+ threads.emplace_back(&task_thread_pool::worker_main, this);
+ }
+ }
+
+ /**
+ * Stop, join, and destroy all worker threads.
+ */
+ void stop_all_threads() {
+ const std::lock_guard threads_lock(thread_mutex);
+
+ {
+ const std::lock_guard tasks_lock(task_mutex);
+ pool_running = false;
+ task_cv.notify_all();
+ }
+
+ for (auto& thread : threads) {
+ if (thread.joinable()) {
+ thread.join();
+ }
+ }
+ threads.clear();
+ }
+
+ /**
+ * The worker threads.
+ *
+ * Access protected by thread_mutex
+ */
+ std::vector threads;
+
+ /**
+ * A mutex for methods that start/stop threads.
+ */
+ mutable std::recursive_mutex thread_mutex;
+
+ /**
+ * The task queue.
+ *
+ * Access protected by task_mutex.
+ */
+ std::queue> tasks = {};
+
+ /**
+ * A mutex for all variables related to tasks.
+ */
+ mutable std::mutex task_mutex;
+
+ /**
+ * Used to notify changes to the task queue, such as a new task added, pause/unpause, etc.
+ */
+ std::condition_variable task_cv;
+
+ /**
+ * Used to notify of finished tasks.
+ */
+ std::condition_variable task_finished_cv;
+
+ /**
+ * A signal for worker threads that the pool is either running or shutting down.
+ *
+ * Access protected by task_mutex.
+ */
+ bool pool_running = true;
+
+ /**
+ * A signal for worker threads to not pull new tasks from the queue.
+ *
+ * Access protected by task_mutex.
+ */
+ bool pool_paused = false;
+
+ /**
+ * A signal for worker threads that they should notify task_finished_cv when they finish a task.
+ *
+ * Access protected by task_mutex.
+ */
+ bool notify_task_finish = false;
+
+ /**
+ * A counter of the number of tasks in-progress by worker threads.
+ * Incremented when a task is popped off the task queue and decremented when that task is complete.
+ *
+ * Access protected by task_mutex.
+ */
+ int num_inflight_tasks = 0;
+ };
+}
+
+// clean up
+#undef TTP_NODISCARD
+#undef TTP_CXX17
+
+#endif
+
+#ifndef POOLSTL_INTERNAL_UTILS_HPP
+#define POOLSTL_INTERNAL_UTILS_HPP
+
+// Version macros.
+#define POOLSTL_VERSION_MAJOR 0
+#define POOLSTL_VERSION_MINOR 3
+#define POOLSTL_VERSION_PATCH 5
+
+#include
+#include
+
+#if __cplusplus >= 201703L || (defined(_MSVC_LANG) && _MSVC_LANG >= 201703L)
+#define POOLSTL_HAVE_CXX17 1
+#define POOLSTL_NO_DISCARD [[nodiscard]]
+#else
+#define POOLSTL_HAVE_CXX17 0
+#define POOLSTL_NO_DISCARD
+#endif
+
+#if POOLSTL_HAVE_CXX17 && (!defined(_GLIBCXX_RELEASE) || _GLIBCXX_RELEASE >= 9)
+#define POOLSTL_HAVE_CXX17_LIB 1
+#else
+#define POOLSTL_HAVE_CXX17_LIB 0
+#endif
+
+#if __cplusplus >= 201402L || (defined(_MSVC_LANG) && _MSVC_LANG >= 201402L)
+#define POOLSTL_HAVE_CXX14 1
+#else
+#define POOLSTL_HAVE_CXX14 0
+#endif
+
+namespace poolstl {
+ namespace internal {
+
+ inline constexpr std::size_t get_chunk_size(std::size_t num_steps, unsigned int num_threads) {
+ return (num_steps / num_threads) + ((num_steps % num_threads) > 0 ? 1 : 0);
+ }
+
+ template
+ constexpr typename std::iterator_traits::difference_type
+ get_chunk_size(Iterator first, Iterator last, unsigned int num_threads) {
+ using diff_t = typename std::iterator_traits::difference_type;
+ return static_cast(get_chunk_size((std::size_t)std::distance(first, last), num_threads));
+ }
+
+ template
+ constexpr typename std::iterator_traits::difference_type
+ get_iter_chunk_size(const Iterator& iter, const Iterator& last,
+ typename std::iterator_traits::difference_type chunk_size) {
+ return std::min(chunk_size, std::distance(iter, last));
+ }
+
+ template
+ Iterator advanced(Iterator iter, typename std::iterator_traits::difference_type offset) {
+ Iterator ret = iter;
+ std::advance(ret, offset);
+ return ret;
+ }
+
+ /**
+ * An iterator wrapper that calls std::future<>::get().
+ * @tparam Iterator
+ */
+ template
+ class getting_iter : public Iterator {
+ public:
+ using value_type = decltype((*std::declval()).get());
+ using difference_type = typename std::iterator_traits::difference_type;
+ using pointer = value_type*;
+ using reference = value_type&;
+ explicit getting_iter(Iterator iter) : iter(iter) {}
+
+ getting_iter operator++() { ++iter; return *this; }
+ getting_iter operator++(int) { getting_iter ret(*this); ++iter; return ret; }
+
+ value_type operator*() { return (*iter).get(); }
+ value_type operator[](difference_type offset) { return iter[offset].get(); }
+
+ bool operator==(const getting_iter &other) const { return iter == other.iter; }
+ bool operator!=(const getting_iter &other) const { return iter != other.iter; }
+
+ protected:
+ Iterator iter;
+ };
+
+ template
+ getting_iter get_wrap(Iterator iter) {
+ return getting_iter(iter);
+ }
+
+ template
+ void get_futures(Container& futures) {
+ for (auto &future: futures) {
+ future.get();
+ }
+ }
+
+ /**
+ * Identify a pivot element for quicksort. Chooses the middle element of the range.
+ */
+ template
+ typename std::iterator_traits::value_type quicksort_pivot(Iterator first, Iterator last) {
+ return *(std::next(first, std::distance(first, last) / 2));
+ }
+
+ /**
+ * Predicate for std::partition (for quicksort)
+ */
+ template
+ struct pivot_predicate {
+ pivot_predicate(Compare comp, const T& pivot) : comp(comp), pivot(pivot) {}
+
+ bool operator()(const T& em) {
+ return comp(em, pivot);
+ }
+ Compare comp;
+ const T pivot;
+ };
+
+ /*
+ * Some methods are only available with C++17 and up. Reimplement on older standards.
+ */
+#if POOLSTL_HAVE_CXX17_LIB
+ namespace cpp17 = std;
+#else
+ namespace cpp17 {
+
+ // std::reduce
+
+ template
+ Tp reduce(InputIt first, InputIt last, Tp init, BinOp b) {
+ for (; first != last; ++first)
+ init = b(init, *first);
+ return init;
+ }
+
+ template
+ typename std::iterator_traits::value_type reduce(InputIt first, InputIt last) {
+ return reduce(first, last,
+ typename std::iterator_traits::value_type{},
+ std::plus::value_type>());
+ }
+
+ // std::transform
+
+ template
+ OutputIt transform(InputIt first1, InputIt last1, OutputIt d_first,
+ UnaryOperation unary_op) {
+ while (first1 != last1) {
+ *d_first++ = unary_op(*first1++);
+ }
+
+ return d_first;
+ }
+
+ template
+ OutputIt transform(InputIt1 first1, InputIt1 last1,
+ InputIt2 first2, OutputIt d_first,
+ BinaryOperation binary_op) {
+ while (first1 != last1) {
+ *d_first++ = binary_op(*first1++, *first2++);
+ }
+
+ return d_first;
+ }
+ }
+#endif
+ }
+}
+
+#endif
+
+namespace poolstl {
+
+ namespace ttp = task_thread_pool;
+
+ namespace execution {
+ namespace internal {
+ /**
+ * Holds the thread pool used by par.
+ */
+ inline std::shared_ptr get_default_pool() {
+ static std::shared_ptr pool;
+ static std::once_flag flag;
+ std::call_once(flag, [&](){ pool = std::make_shared(); });
+ return pool;
+ }
+ }
+
+ /**
+ * Base class for all poolSTL policies.
+ */
+ struct poolstl_policy {
+ };
+
+ /**
+ * A sequential policy that simply forwards to the non-policy overload.
+ */
+ struct sequenced_policy : public poolstl_policy {
+ POOLSTL_NO_DISCARD ttp::task_thread_pool* pool() const {
+ // never called, but must exist for C++11 support
+ throw std::runtime_error("poolSTL: requested thread pool for seq policy.");
+ }
+
+ POOLSTL_NO_DISCARD bool par_allowed() const {
+ return false;
+ }
+ };
+
+ /**
+ * A parallel policy that can use a user-specified thread pool or a default one.
+ */
+ struct parallel_policy : public poolstl_policy {
+ parallel_policy() = default;
+ explicit parallel_policy(ttp::task_thread_pool* on_pool, bool par_ok): on_pool(on_pool), par_ok(par_ok) {}
+
+ parallel_policy on(ttp::task_thread_pool& pool) const {
+ return parallel_policy{&pool, par_ok};
+ }
+
+ parallel_policy par_if(bool call_par) const {
+ return parallel_policy{on_pool, call_par};
+ }
+
+ POOLSTL_NO_DISCARD ttp::task_thread_pool* pool() const {
+ if (on_pool) {
+ return on_pool;
+ } else {
+ return internal::get_default_pool().get();
+ }
+ }
+
+ POOLSTL_NO_DISCARD bool par_allowed() const {
+ return par_ok;
+ }
+
+ protected:
+ ttp::task_thread_pool *on_pool = nullptr;
+ bool par_ok = true;
+ };
+
+ constexpr sequenced_policy seq{};
+ constexpr parallel_policy par{};
+
+ /**
+ * EXPERIMENTAL: Subject to significant changes or removal.
+ * Use pure threads for each operation instead of a shared thread pool.
+ *
+ * Advantage:
+ * - Fewer symbols (no packaged_task with its operators, destructors, vtable, etc) means smaller binary
+ * which can mean a lot when there are many calls.
+ * - No thread pool to manage.
+ *
+ * Disadvantages:
+ * - Threads are started and joined for every operation, so it is harder to amortize that cost.
+ * - Barely any algorithms are supported.
+ */
+ struct pure_threads_policy : public poolstl_policy {
+ explicit pure_threads_policy(unsigned int num_threads, bool par_ok): num_threads(num_threads),
+ par_ok(par_ok) {}
+
+ POOLSTL_NO_DISCARD unsigned int get_num_threads() const {
+ if (num_threads == 0) {
+ return std::thread::hardware_concurrency();
+ }
+ return num_threads;
+ }
+
+ POOLSTL_NO_DISCARD bool par_allowed() const {
+ return par_ok;
+ }
+
+ protected:
+ unsigned int num_threads = 1;
+ bool par_ok = true;
+ };
+
+ /**
+ * Choose parallel or sequential at runtime.
+ *
+ * @param call_par Whether to use a parallel policy.
+ * @return `par` if call_par is true, else a sequential policy (like `seq`).
+ */
+ inline parallel_policy par_if(bool call_par) {
+ return parallel_policy{nullptr, call_par};
+ }
+
+ /**
+ * Choose parallel or sequential at runtime, with pool selection.
+ *
+ * @param call_par Whether to use a parallel policy.
+ * @return `par.on(pool)` if call_par is true, else a sequential policy (like `seq`).
+ */
+ inline parallel_policy par_if(bool call_par, ttp::task_thread_pool& pool) {
+ return parallel_policy{&pool, call_par};
+ }
+
+ /**
+ * EXPERIMENTAL: Subject to significant changes or removal. See `pure_threads_policy`.
+ * Choose parallel or sequential at runtime, with thread count selection.
+ *
+ * @param call_par Whether to use a parallel policy.
+ * @return `par.on(pool)` if call_par is true, else `seq`.
+ */
+ inline pure_threads_policy par_if_threads(bool call_par, unsigned int num_threads) {
+ return pure_threads_policy{num_threads, call_par};
+ }
+ }
+
+ using execution::seq;
+ using execution::par;
+ using execution::par_if;
+
+ namespace internal {
+ /**
+ * To enable/disable seq overload resolution
+ */
+ template
+ using enable_if_seq =
+ typename std::enable_if<
+ std::is_same::type>::type>::value,
+ Tp>::type;
+
+ /**
+ * To enable/disable par overload resolution
+ */
+ template
+ using enable_if_par =
+ typename std::enable_if<
+ std::is_same::type>::type>::value,
+ Tp>::type;
+
+ /**
+ * To enable/disable par overload resolution
+ */
+ template
+ using enable_if_poolstl_policy =
+ typename std::enable_if<
+ std::is_base_of::type>::type>::value,
+ Tp>::type;
+
+ template
+ bool is_seq(const ExecPolicy& policy) {
+ return !policy.par_allowed();
+ }
+
+ template
+ using is_pure_threads_policy = std::is_same::type>::type>;
+ }
+}
+
+#endif
+
+namespace poolstl {
+ namespace internal {
+
+#if POOLSTL_HAVE_CXX17_LIB
+ /**
+ * Call std::apply in parallel.
+ */
+ template
+ std::vector>
+ parallel_apply(ExecPolicy &&policy, Op op, const ArgContainer& args_list) {
+ std::vector> futures;
+ auto& task_pool = *policy.pool();
+
+ for (const auto& args : args_list) {
+ futures.emplace_back(task_pool.submit([](Op op, const auto& args_fwd) {
+ std::apply(op, args_fwd);
+ }, op, args));
+ }
+
+ return futures;
+ }
+#endif
+
+ /**
+ * Chunk a single range, with autodetected return types.
+ */
+ template ()(std::declval(), std::declval()))>
+ std::vector>
+ parallel_chunk_for_gen(ExecPolicy &&policy, RandIt first, RandIt last, Chunk chunk,
+ ChunkRet* = (decltype(std::declval()(std::declval(),
+ std::declval()))*)nullptr,
+ int extra_split_factor = 1) {
+ std::vector> futures;
+ auto& task_pool = *policy.pool();
+ auto chunk_size = get_chunk_size(first, last, extra_split_factor * task_pool.get_num_threads());
+
+ while (first < last) {
+ auto iter_chunk_size = get_iter_chunk_size(first, last, chunk_size);
+ RandIt loop_end = advanced(first, iter_chunk_size);
+
+ futures.emplace_back(task_pool.submit(chunk, first, loop_end));
+
+ first = loop_end;
+ }
+
+ return futures;
+ }
+
+ /**
+ * Chunk a single range.
+ */
+ template
+ std::vector>
+ parallel_chunk_for_1(ExecPolicy &&policy, RandIt first, RandIt last,
+ Chunk chunk, ChunkRet*, int extra_split_factor, A&&... chunk_args) {
+ std::vector> futures;
+ auto& task_pool = *policy.pool();
+ auto chunk_size = get_chunk_size(first, last, extra_split_factor * task_pool.get_num_threads());
+
+ while (first < last) {
+ auto iter_chunk_size = get_iter_chunk_size(first, last, chunk_size);
+ RandIt loop_end = advanced(first, iter_chunk_size);
+
+ futures.emplace_back(task_pool.submit(chunk, first, loop_end, chunk_args...));
+
+ first = loop_end;
+ }
+
+ return futures;
+ }
+
+ /**
+ * Chunk a single range.
+ */
+ template
+ typename std::enable_if::value, void>::type
+ parallel_chunk_for_1_wait(ExecPolicy &&policy, RandIt first, RandIt last,
+ Chunk chunk, ChunkRet* rettype, int extra_split_factor, A&&... chunk_args) {
+ auto futures = parallel_chunk_for_1(std::forward(policy), first, last,
+ chunk, rettype, extra_split_factor, chunk_args...);
+ get_futures(futures);
+ }
+
+ /**
+ * Element-wise chunk two ranges.
+ */
+ template
+ std::vector>
+ parallel_chunk_for_2(ExecPolicy &&policy, RandIt1 first1, RandIt1 last1, RandIt2 first2,
+ Chunk chunk, ChunkRet*, A&&... chunk_args) {
+ std::vector> futures;
+ auto& task_pool = *policy.pool();
+ auto chunk_size = get_chunk_size(first1, last1, task_pool.get_num_threads());
+
+ while (first1 < last1) {
+ auto iter_chunk_size = get_iter_chunk_size(first1, last1, chunk_size);
+ RandIt1 loop_end = advanced(first1, iter_chunk_size);
+
+ futures.emplace_back(task_pool.submit(chunk, first1, loop_end, first2, chunk_args...));
+
+ first1 = loop_end;
+ std::advance(first2, iter_chunk_size);
+ }
+
+ return futures;
+ }
+
+ /**
+ * Element-wise chunk three ranges.
+ */
+ template
+ std::vector>
+ parallel_chunk_for_3(ExecPolicy &&policy, RandIt1 first1, RandIt1 last1, RandIt2 first2, RandIt3 first3,
+ Chunk chunk, ChunkRet*, A&&... chunk_args) {
+ std::vector> futures;
+ auto& task_pool = *policy.pool();
+ auto chunk_size = get_chunk_size(first1, last1, task_pool.get_num_threads());
+
+ while (first1 < last1) {
+ auto iter_chunk_size = get_iter_chunk_size(first1, last1, chunk_size);
+ RandIt1 loop_end = advanced(first1, iter_chunk_size);
+
+ futures.emplace_back(task_pool.submit(chunk, first1, loop_end, first2, first3, chunk_args...));
+
+ first1 = loop_end;
+ std::advance(first2, iter_chunk_size);
+ std::advance(first3, iter_chunk_size);
+ }
+
+ return futures;
+ }
+
+ /**
+ * Sort a range in parallel.
+ *
+ * @param sort_func Sequential sort method, like std::sort or std::stable_sort
+ * @param merge_func Sequential merge method, like std::inplace_merge
+ */
+ template
+ void parallel_mergesort(ExecPolicy &&policy, RandIt first, RandIt last,
+ Compare comp, SortFunc sort_func, MergeFunc merge_func) {
+ if (first == last) {
+ return;
+ }
+
+ // Sort chunks in parallel
+ auto futures = parallel_chunk_for_gen(std::forward(policy), first, last,
+ [&comp, sort_func] (RandIt chunk_first, RandIt chunk_last) {
+ sort_func(chunk_first, chunk_last, comp);
+ return std::make_pair(chunk_first, chunk_last);
+ });
+
+ // Merge the sorted ranges
+ using SortedRange = std::pair;
+ auto& task_pool = *policy.pool();
+ std::vector subranges;
+ do {
+ for (auto& future : futures) {
+ subranges.emplace_back(future.get());
+ }
+ futures.clear();
+
+ for (std::size_t i = 0; i < subranges.size(); ++i) {
+ if (i + 1 < subranges.size()) {
+ // pair up and merge
+ auto& lhs = subranges[i];
+ auto& rhs = subranges[i + 1];
+ futures.emplace_back(task_pool.submit([&comp, merge_func] (RandIt chunk_first,
+ RandIt chunk_middle,
+ RandIt chunk_last) {
+ merge_func(chunk_first, chunk_middle, chunk_last, comp);
+ return std::make_pair(chunk_first, chunk_last);
+ }, lhs.first, lhs.second, rhs.second));
+ ++i;
+ } else {
+ // forward the final extra range
+ std::promise p;
+ futures.emplace_back(p.get_future());
+ p.set_value(subranges[i]);
+ }
+ }
+
+ subranges.clear();
+ } while (futures.size() > 1);
+ futures.front().get();
+ }
+
+ /**
+ * Quicksort worker function.
+ */
+ template
+ void quicksort_impl(task_thread_pool::task_thread_pool* task_pool, const RandIt first, const RandIt last,
+ Compare comp, SortFunc sort_func, PartFunc part_func, PivotFunc pivot_func,
+ std::ptrdiff_t target_leaf_size,
+ std::vector>* futures, std::mutex* mutex,
+ std::condition_variable* cv, int* inflight_spawns) {
+ using T = typename std::iterator_traits::value_type;
+
+ auto partition_size = std::distance(first, last);
+
+ if (partition_size > target_leaf_size) {
+ // partition the range
+ auto mid = part_func(first, last, pivot_predicate(comp, pivot_func(first, last)));
+
+ if (mid != first && mid != last) {
+ // was able to partition the range, so recurse
+ std::lock_guard guard(*mutex);
+ ++(*inflight_spawns);
+
+ futures->emplace_back(task_pool->submit(
+ quicksort_impl,
+ task_pool, first, mid, comp, sort_func, part_func, pivot_func, target_leaf_size,
+ futures, mutex, cv, inflight_spawns));
+
+ futures->emplace_back(task_pool->submit(
+ quicksort_impl,
+ task_pool, mid, last, comp, sort_func, part_func, pivot_func, target_leaf_size,
+ futures, mutex, cv, inflight_spawns));
+ return;
+ }
+ }
+
+ // Range does not need to be subdivided (or was unable to subdivide). Run the sequential sort.
+ {
+ // notify main thread that partitioning may be finished
+ std::lock_guard guard(*mutex);
+ --(*inflight_spawns);
+ }
+ cv->notify_one();
+
+ sort_func(first, last, comp);
+ }
+
+ /**
+ * Sort a range in parallel using quicksort.
+ *
+ * @param sort_func Sequential sort method, like std::sort or std::stable_sort
+ * @param part_func Method that partitions a range, like std::partition or std::stable_partition
+ * @param pivot_func Method that identifies the pivot
+ */
+ template
+ void parallel_quicksort(ExecPolicy &&policy, RandIt first, RandIt last,
+ Compare comp, SortFunc sort_func, PartFunc part_func, PivotFunc pivot_func) {
+ if (first == last) {
+ return;
+ }
+
+ auto& task_pool = *policy.pool();
+
+ // Target partition size. Range will be recursively partitioned into partitions no bigger than this
+ // size. Target approximately twice as many partitions as threads to reduce impact of uneven pivot
+ // selection.
+ auto num_threads = task_pool.get_num_threads();
+ std::ptrdiff_t target_leaf_size = std::max(std::distance(first, last) / (num_threads * 2),
+ (std::ptrdiff_t)5);
+
+ if (num_threads == 1) {
+ target_leaf_size = std::distance(first, last);
+ }
+
+ // task_thread_pool does not support creating task DAGs, so organize the code such that
+ // all parallel tasks are independent. The parallel tasks can spawn additional parallel tasks, and they
+ // record their "child" task's std::future into a common vector to be waited on by the main thread.
+ std::mutex mutex;
+
+ // Futures of parallel tasks. Access protected by mutex.
+ std::vector> futures;
+
+ // For signaling that all partitioning has been completed and futures vector is complete. Uses mutex.
+ std::condition_variable cv;
+
+ // Number of `quicksort_impl` calls that haven't finished yet. Nonzero value means futures vector may
+ // still be modified. Access protected by mutex.
+ int inflight_spawns = 1;
+
+ // Root task.
+ quicksort_impl(&task_pool, first, last, comp, sort_func, part_func, pivot_func, target_leaf_size,
+ &futures, &mutex, &cv, &inflight_spawns);
+
+ // Wait for all partitioning to finish.
+ {
+ std::unique_lock lock(mutex);
+ cv.wait(lock, [&] { return inflight_spawns == 0; });
+ }
+
+ // Wait on all the parallel tasks.
+ get_futures(futures);
+ }
+
+ /**
+ * Partition range according to predicate. Unstable.
+ *
+ * This implementation only parallelizes with p=2; will spawn and wait for only one task.
+ */
+ template
+ RandIt partition_p2(task_thread_pool::task_thread_pool &task_pool, RandIt first, RandIt last, Predicate pred) {
+ auto range_size = std::distance(first, last);
+ if (range_size < 4) {
+ return std::partition(first, last, pred);
+ }
+
+ // approach should be generalizable to arbitrary p
+
+ RandIt mid = std::next(first + range_size / 2);
+
+ // partition left and right halves in parallel
+ auto left_future = task_pool.submit(std::partition, first, mid, pred);
+ RandIt right_mid = std::partition(mid, last, pred);
+ RandIt left_mid = left_future.get();
+
+ // merge the two partitioned halves
+ auto left_highs_size = std::distance(left_mid, mid);
+ auto right_lows_size = std::distance(mid, right_mid);
+ if (left_highs_size <= right_lows_size) {
+ std::swap_ranges(left_mid, mid, right_mid - left_highs_size);
+ return right_mid - left_highs_size;
+ } else {
+ std::swap_ranges(mid, right_mid, left_mid);
+ return left_mid + right_lows_size;
+ }
+ }
+ }
+}
+
+#endif
+
+#ifndef POOLSTL_INTERNAL_THREAD_IMPL_HPP
+#define POOLSTL_INTERNAL_THREAD_IMPL_HPP
+
+/**
+ * EXPERIMENTAL: Subject to significant changes or removal.
+ * An implementation using only std::thread and no thread pool at all.
+ *
+ * Advantage:
+ * - Fewer symbols (no packaged_task with its operators, destructors, vtable, etc) means smaller binary
+ * which can mean a lot when there are many calls like with many templates.
+ * - No thread pool to manage.
+ *
+ * Disadvantages:
+ * - Threads are started and joined for every operation, so it is harder to amortize that cost.
+ * - Barely any algorithms are supported.
+ */
+
+
+
+namespace poolstl {
+ namespace internal {
+
+ template
+ typename std::enable_if::value, void>::type
+ parallel_chunk_for_1_wait(ExecPolicy &&policy, RandIt first, RandIt last,
+ Chunk chunk, ChunkRet*, int extra_split_factor, A&&... chunk_args) {
+ std::vector threads;
+ auto chunk_size = get_chunk_size(first, last, extra_split_factor * policy.get_num_threads());
+
+ while (first < last) {
+ auto iter_chunk_size = get_iter_chunk_size(first, last, chunk_size);
+ RandIt loop_end = advanced(first, iter_chunk_size);
+
+ threads.emplace_back(std::thread(chunk, first, loop_end, chunk_args...));
+
+ first = loop_end;
+ }
+
+ for (auto& thread : threads) {
+ if (thread.joinable()) {
+ thread.join();
+ }
+ }
+ }
+ }
+}
+
+#endif
+
+namespace poolstl {
+ /**
+ * NOTE: Iterators are expected to be random access.
+ *
+ * Like `std::sort`, but allows specifying the sequential sort method, which must have the
+ * same signature as the comparator version of `std::sort`.
+ *
+ * Implemented as a high-level quicksort that delegates to `sort_func`, in parallel, once the range has been
+ * sufficiently partitioned.
+ */
+ template
+ poolstl::internal::enable_if_poolstl_policy
+ pluggable_sort(ExecPolicy &&policy, RandIt first, RandIt last, Compare comp,
+ void (sort_func)(RandIt, RandIt, Compare) = std::sort) {
+ if (poolstl::internal::is_seq(policy)) {
+ sort_func(first, last, comp);
+ return;
+ }
+
+ // Parallel partition.
+ // The partition_p2 method spawns and waits for its own child task. A deadlock is possible if all worker
+ // threads are waiting for tasks that in turn have to workers to execute them. This is only an issue because
+ // our thread pool does not have the concept of dependencies.
+ // So ensure
+ auto& task_pool = *policy.pool();
+ std::atomic allowed_parallel_partitions{(int)task_pool.get_num_threads() / 2};
+
+ auto part_func = [&task_pool, &allowed_parallel_partitions](RandIt chunk_first, RandIt chunk_last,
+ poolstl::internal::pivot_predicate::value_type> pred) {
+ if (allowed_parallel_partitions.fetch_sub(1) > 0) {
+ return poolstl::internal::partition_p2(task_pool, chunk_first, chunk_last, pred);
+ } else {
+ return std::partition(chunk_first, chunk_last, pred);
+ }
+ };
+
+ poolstl::internal::parallel_quicksort(std::forward(policy), first, last, comp, sort_func, part_func,
+ poolstl::internal::quicksort_pivot);
+ }
+
+ /**
+ * NOTE: Iterators are expected to be random access.
+ *
+ * Like `std::sort`, but allows specifying the sequential sort method, which must have the
+ * same signature as the comparator version of `std::sort`.
+ *
+ * Implemented as a parallel high-level quicksort that delegates to `sort_func` once the range has been
+ * sufficiently partitioned.
+ */
+ template
+ poolstl::internal::enable_if_poolstl_policy
+ pluggable_sort(ExecPolicy &&policy, RandIt first, RandIt last,
+ void (sort_func)(RandIt, RandIt,
+ std::less::value_type>) = std::sort){
+ using T = typename std::iterator_traits::value_type;
+ pluggable_sort(std::forward