Skip to content

Commit

Permalink
add parallel_range_blocks_multi
Browse files Browse the repository at this point in the history
  • Loading branch information
fuzziqersoftware committed Nov 3, 2024
1 parent e33d770 commit a8fba6b
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 15 deletions.
37 changes: 37 additions & 0 deletions src/Tools.hh
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ IntT parallel_range_blocks(
if (num_threads == 0) {
num_threads = std::thread::hardware_concurrency();
}
if (num_threads < 1) {
throw std::logic_error("thread count must be at least 1");
}

std::atomic<IntT> current_value(start_value);
std::atomic<IntT> result_value(end_value);
Expand Down Expand Up @@ -189,4 +192,38 @@ IntT parallel_range_blocks(
return result_value;
}

// Like parallel_range_blocks, but returns all values for which fn returned
// true. (Unlike the other parallel_range functions, this one does not return
// early.)
template <typename IntT = uint64_t, typename RetT = std::unordered_set<IntT>>
std::unordered_set<IntT> parallel_range_blocks_multi(
std::function<bool(IntT value, size_t thread_num)> fn,
IntT start_value,
IntT end_value,
IntT block_size,
size_t num_threads = 0,
std::function<void(IntT start_value, IntT end_value, IntT current_value, uint64_t start_time_usecs)> progress_fn = parallel_range_default_progress_fn<IntT>) {

if (num_threads == 0) {
num_threads = std::thread::hardware_concurrency();
}

std::vector<RetT> thread_rets(num_threads);
parallel_range_blocks<IntT>([&](IntT z, size_t thread_num) {
if (fn(z, thread_num)) {
thread_rets[thread_num].emplace(z);
}
return false;
},
start_value, end_value, block_size, num_threads, progress_fn);

RetT ret = std::move(thread_rets[0]);
for (size_t z = 1; z < thread_rets.size(); z++) {
auto& thread_ret = thread_rets[z];
ret.insert(std::make_move_iterator(thread_ret.begin()), std::make_move_iterator(thread_ret.end()));
}

return ret;
}

} // namespace phosg
72 changes: 57 additions & 15 deletions src/ToolsTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,22 @@ int main(int, char**) {
expect_eq(sum, hits.size());
}

{
printf("-- parallel_range return value\n");
uint64_t target_value = 0xC349;
auto is_equal = [&](uint64_t v, size_t) -> bool {
return (v == target_value);
};
expect_eq((parallel_range<uint64_t>(is_equal, 0, 0x10000, num_threads, nullptr)), target_value);
// Note: We can't check that parallel_range ends early when fn returns true
// because it's not actually guaranteed to do so - it's only guaranteed to
// return any of the values for which fn returns true. One could imagine a sequence of events in
// which the target value's call takes a very long time, and all other threads
// could finish checking all other values before the target one returns true.
target_value = 0xCC349; // > end_value; should not be found
expect_eq((parallel_range<uint64_t>(is_equal, 0, 0x10000, num_threads, nullptr)), 0x10000);
}

{
printf("-- parallel_range_blocks\n");
vector<uint8_t> hits(0x1000000, 0);
Expand Down Expand Up @@ -76,35 +92,61 @@ int main(int, char**) {
}

{
printf("-- parallel_range return value\n");
printf("-- parallel_range_blocks return value\n");
uint64_t target_value = 0xC349;
auto is_equal = [&](uint64_t v, size_t) -> bool {
return (v == target_value);
};
expect_eq((parallel_range<uint64_t>(is_equal, 0, 0x10000, num_threads, nullptr)), target_value);
expect_eq((parallel_range_blocks<uint64_t>(is_equal, 0, 0x100000, 0x1000, num_threads, nullptr)), target_value);
// Note: We can't check that parallel_range ends early when fn returns true
// because it's not actually guaranteed to do so - it's only guaranteed to
// return any of the values for which fn returns true. One could imagine a sequence of events in
// which the target value's call takes a very long time, and all other threads
// could finish checking all other values before the target one returns true.
target_value = 0xCC349; // > end_value; should not be found
expect_eq((parallel_range<uint64_t>(is_equal, 0, 0x10000, num_threads, nullptr)), 0x10000);
target_value = 0xCCC349; // > end_value; should not be found
expect_eq((parallel_range_blocks<uint64_t>(is_equal, 0, 0x100000, 0x1000, num_threads, nullptr)), 0x100000);
}

{
printf("-- parallel_range_blocks return value\n");
uint64_t target_value = 0xC349;
printf("-- parallel_range_blocks_multi\n");
vector<uint8_t> hits(0x1000000, 0);
auto handle_value = [&](uint64_t v, size_t thread_num) -> bool {
hits[v] = thread_num + 1;
return false;
};
uint64_t start_time = now();
parallel_range_blocks_multi<uint64_t>(handle_value, 0, hits.size(), 0x1000, num_threads, nullptr);
uint64_t duration = now() - start_time;
fprintf(stderr, "---- time: %" PRIu64 "\n", duration);

vector<size_t> thread_counts(num_threads, 0);
for (size_t x = 0; x < hits.size(); x++) {
expect_ne(hits[x], 0);
thread_counts.at(hits[x] - 1)++;
}

size_t sum = 0;
for (size_t x = 0; x < thread_counts.size(); x++) {
expect_ne(thread_counts[x], 0);
fprintf(stderr, "---- thread %zu: %zu\n", x, thread_counts[x]);
sum += thread_counts[x];
}
expect_eq(sum, hits.size());
}

{
printf("-- parallel_range_blocks_multi return value\n");
uint64_t target_value1 = 0xC349;
uint64_t target_value2 = 0x53A0;
uint64_t target_value3 = 0x034D;
auto is_equal = [&](uint64_t v, size_t) -> bool {
return (v == target_value);
return ((v == target_value1) || (v == target_value2) || (v == target_value3));
};
expect_eq((parallel_range_blocks<uint64_t>(is_equal, 0, 0x100000, 0x1000, num_threads, nullptr)), target_value);
// Note: We can't check that parallel_range ends early when fn returns true
// because it's not actually guaranteed to do so - it's only guaranteed to
// return any of the values for which fn returns true. One could imagine a sequence of events in
// which the target value's call takes a very long time, and all other threads
// could finish checking all other values before the target one returns true.
target_value = 0xCCC349; // > end_value; should not be found
expect_eq((parallel_range_blocks<uint64_t>(is_equal, 0, 0x100000, 0x1000, num_threads, nullptr)), 0x100000);
auto found = parallel_range_blocks_multi<uint64_t>(is_equal, 0, 0x100000, 0x1000, num_threads, nullptr);
expect_eq(3, found.size());
expect(found.count(target_value1));
expect(found.count(target_value2));
expect(found.count(target_value3));
}

printf("ToolsTest: all tests passed\n");
Expand Down

0 comments on commit a8fba6b

Please sign in to comment.