Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to support CUDA backend #727

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 44 additions & 3 deletions examples/shp/vector_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,27 @@
// SPDX-License-Identifier: BSD-3-Clause

#include <dr/shp.hpp>
#include <fmt/ranges.h>

int main(int argc, char **argv) {
printf("Creating NUMA devices...\n");
auto devices = dr::shp::get_numa_devices(sycl::default_selector_v);
dr::shp::init(devices);
dr::shp::init(sycl::default_selector_v);

for (auto &device : devices) {
for (auto &device : dr::shp::devices()) {
std::cout << " Device: " << device.get_info<sycl::info::device::name>()
<< "\n";
}

fmt::print("First check...\n");
dr::shp::check_queues();

fmt::print("Initializing distributed vector...\n");
dr::shp::distributed_vector<int, dr::shp::device_allocator<int>> v(100);

fmt::print("Second check...\n");
dr::shp::check_queues();

fmt::print("For each...\n");
dr::shp::for_each(dr::shp::par_unseq, dr::shp::enumerate(v),
[](auto &&tuple) {
auto &&[idx, value] = tuple;
Expand All @@ -25,6 +33,10 @@ int main(int argc, char **argv) {
dr::shp::for_each(dr::shp::par_unseq, v,
[](auto &&value) { value = value + 2; });

fmt::print("Third check...\n");
dr::shp::check_queues();

fmt::print("Reduce...\n");
std::size_t sum = dr::shp::reduce(dr::shp::par_unseq, v, int(0), std::plus{});

dr::shp::print_range(v);
Expand All @@ -34,17 +46,29 @@ int main(int argc, char **argv) {
std::vector<int> local_vec(v.size());
std::iota(local_vec.begin(), local_vec.end(), 0);

fmt::print("Fourth check...\n");
dr::shp::check_queues();

dr::shp::print_range(local_vec, "local vec");

fmt::print("Fourth Two check...\n");
dr::shp::check_queues();

dr::shp::copy(local_vec.begin(), local_vec.end(), v.begin());

fmt::print("Fourth Three check...\n");
dr::shp::check_queues();

dr::shp::print_range(v, "vec after copy");

dr::shp::for_each(dr::shp::par_unseq, v,
[](auto &&value) { value = value + 2; });

dr::shp::print_range(v, "vec after update");

fmt::print("Fourth One check...\n");
dr::shp::check_queues();

dr::shp::copy(v.begin(), v.end(), local_vec.begin());

dr::shp::print_range(local_vec, "local vec after copy");
Expand All @@ -55,5 +79,22 @@ int main(int argc, char **argv) {
v.resize(50);
dr::shp::print_range(v, "resized to 50");

fmt::print("Fifth check...\n");
dr::shp::check_queues();

fmt::print("Getting ready to finalize...\n");
fflush(stdout);

fmt::print("Check queues...\n");
dr::shp::check_queues();

fmt::print("Finalizing...\n");
fflush(stdout);

dr::shp::finalize();


fmt::print("Exiting...\n");
fflush(stdout);
return 0;
}
24 changes: 22 additions & 2 deletions include/dr/shp/algorithms/copy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ sycl::event copy_async(InputIt first, InputIt last, OutputIt d_first) {

std::vector<sycl::event> events;

fmt::print("copy_async...\n");

while (first != last) {
auto &&segment = *segment_iter;
auto size = rng::distance(segment);
Expand All @@ -120,14 +122,23 @@ sycl::event copy_async(InputIt first, InputIt last, OutputIt d_first) {
auto local_last = first;
rng::advance(local_last, n_to_copy);

fmt::print("copying...\n");
events.emplace_back(
dr::shp::copy_async(first, local_last, rng::begin(segment)));

fmt::print("check queues...\n");
dr::shp::check_queues();

fmt::print("continue...\n");
++segment_iter;
rng::advance(first, n_to_copy);
}

return dr::shp::__detail::combine_events(events);
for (auto&& event : events) {
event.wait();
}
return events[0];
// return dr::shp::__detail::combine_events(events);
}

auto copy(rng::contiguous_range auto r, dr::distributed_iterator auto d_first) {
Expand All @@ -142,7 +153,11 @@ template <std::forward_iterator InputIt, dr::distributed_iterator OutputIt>
requires __detail::is_syclmemcopyable<std::iter_value_t<InputIt>,
std::iter_value_t<OutputIt>>
OutputIt copy(InputIt first, InputIt last, OutputIt d_first) {
fmt::print("Async copy...\n");
copy_async(first, last, d_first).wait();
fmt::print("Checking queues...\n");
dr::shp::check_queues();
fmt::print("Returning...\n");
return d_first + (last - first);
}

Expand All @@ -166,7 +181,12 @@ sycl::event copy_async(InputIt first, InputIt last, OutputIt d_first) {
rng::advance(d_first, size);
}

return dr::shp::__detail::combine_events(events);
for (auto&& event : events) {
event.wait();
}
return events[0];

// return dr::shp::__detail::combine_events(events);
}

template <dr::distributed_iterator InputIt, std::forward_iterator OutputIt>
Expand Down
9 changes: 4 additions & 5 deletions include/dr/shp/algorithms/exclusive_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ void exclusive_scan_impl_(ExecutionPolicy &&policy, R &&r, O &&o, U init,
if constexpr (std::is_same_v<std::remove_cvref_t<ExecutionPolicy>,
device_policy>) {

U *d_inits = sycl::malloc_device<U>(rng::size(zipped_segments),
shp::devices()[0], shp::context());
U *d_inits =
sycl::malloc_device<U>(rng::size(zipped_segments), __detail::queue(0));

std::vector<sycl::event> events;

Expand All @@ -63,12 +63,11 @@ void exclusive_scan_impl_(ExecutionPolicy &&policy, R &&r, O &&o, U init,

shp::copy(d_inits, d_inits + inits.size(), inits.data() + 1);

sycl::free(d_inits, shp::context());
sycl::free(d_inits, __detail::queue(0));

inits[0] = init;

auto root = dr::shp::devices()[0];
dr::shp::device_allocator<T> allocator(dr::shp::context(), root);
dr::shp::device_allocator<T> allocator(__detail::queue(0));
dr::shp::vector<T, dr::shp::device_allocator<T>> partial_sums(
std::size_t(zipped_segments.size()), allocator);

Expand Down
4 changes: 4 additions & 0 deletions include/dr/shp/algorithms/fill.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ template <typename T, typename U>
requires(std::indirectly_writable<device_ptr<T>, U>)
sycl::event fill_async(device_ptr<T> first, device_ptr<T> last,
const U &value) {
fmt::print("Fill async...\n");
auto &&q = __detail::get_queue_for_pointer(first);
fmt::print("Got queue...\n");
auto *arr = first.get_raw_pointer();
// not using q.fill because of CMPLRLLVM-46438
return dr::__detail::parallel_for(q, sycl::range<>(last - first),
Expand All @@ -49,7 +51,9 @@ sycl::event fill_async(device_ptr<T> first, device_ptr<T> last,
template <typename T, typename U>
requires(std::indirectly_writable<device_ptr<T>, U>)
void fill(device_ptr<T> first, device_ptr<T> last, const U &value) {
fmt::print("Fill...\n");
fill_async(first, last, value).wait();
fmt::print("Fill.\n");
}

template <typename T, dr::remote_contiguous_range R>
Expand Down
3 changes: 1 addition & 2 deletions include/dr/shp/algorithms/inclusive_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ void inclusive_scan_impl_(ExecutionPolicy &&policy, R &&r, O &&o,

std::vector<sycl::event> events;

auto root = dr::shp::devices()[0];
dr::shp::device_allocator<T> allocator(dr::shp::context(), root);
dr::shp::device_allocator<T> allocator(__detail::queue(0));
dr::shp::vector<T, dr::shp::device_allocator<T>> partial_sums(
std::size_t(zipped_segments.size()), allocator);

Expand Down
24 changes: 10 additions & 14 deletions include/dr/shp/algorithms/sort.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ void sort(R &&r, Compare comp = Compare()) {
// Each segment has `n_splitters` medians,
// so `n_segments * n_splitters` medians total.

T *medians = sycl::malloc_device<T>(n_segments * n_splitters,
shp::devices()[0], shp::context());
T *medians =
sycl::malloc_device<T>(n_segments * n_splitters, shp::__detail::queue(0));
std::size_t segment_id = 0;

for (auto &&segment : segments) {
Expand Down Expand Up @@ -150,21 +150,19 @@ void sort(R &&r, Compare comp = Compare()) {

auto &&local_segment = dr::shp::__detail::local(segment);

std::size_t *splitter_i = sycl::malloc_shared<std::size_t>(
n_splitters, q.get_device(), shp::context());
std::size_t *splitter_i = sycl::malloc_shared<std::size_t>(n_splitters, q);
splitter_indices.push_back(splitter_i);

// Local copy `medians_l` necessary due to [GSD-3893]
T *medians_l =
sycl::malloc_device<T>(n_splitters, q.get_device(), shp::context());
T *medians_l = sycl::malloc_device<T>(n_splitters, q);

q.memcpy(medians_l, medians, sizeof(T) * n_splitters).wait();

__detail::lower_bound(local_policy, rng::begin(local_segment),
rng::end(local_segment), medians_l,
medians_l + n_splitters, splitter_i, comp);

sycl::free(medians_l, shp::context());
sycl::free(medians_l, q);

auto p_first = rng::begin(local_segment);
auto p_last = p_first;
Expand Down Expand Up @@ -269,15 +267,13 @@ void sort(R &&r, Compare comp = Compare()) {

// Free temporary memory.

for (auto &&sorted_seg : sorted_segments) {
sycl::free(sorted_seg, shp::context());
}

for (auto &&splitter_i : splitter_indices) {
sycl::free(splitter_i, shp::context());
for (std::size_t i = 0; i < sorted_segments.size(); i++) {
auto &&q = dr::shp::__detail::queue(dr::ranges::rank(segments[i]));
sycl::free(sorted_segments[i], q);
sycl::free(splitter_indices[i], q);
}

sycl::free(medians, shp::context());
sycl::free(medians, shp::__detail::queue(0));
}

template <dr::distributed_iterator RandomIt, typename Compare = std::less<>>
Expand Down
35 changes: 30 additions & 5 deletions include/dr/shp/detail.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include <iterator>
#include <sycl/sycl.hpp>

#include <fmt/ranges.h>

namespace dr::shp {

namespace __detail {
Expand All @@ -23,26 +25,48 @@ concept is_syclmemcopyable = std::is_same_v<std::remove_const_t<Src>, Dest> &&

template <std::contiguous_iterator Iter>
sycl::usm::alloc get_pointer_type(Iter iter) {
return sycl::get_pointer_type(std::to_address(iter), shp::context());
for (auto&& device : shp::devices()) {
try {
return sycl::get_pointer_type(std::to_address(iter), __detail::queue(device).get_context());
} catch(...) {}
}
assert(false);
}

template <typename T>
sycl::usm::alloc get_pointer_type(shp::device_ptr<T> ptr) {
return sycl::get_pointer_type(ptr.get_raw_pointer(), shp::context());
for (auto&& device : shp::devices()) {
try {
return sycl::get_pointer_type(ptr.get_raw_pointer(), __detail::queue(device).get_context());
} catch(...) {}
}
assert(false);
}

template <std::contiguous_iterator Iter>
sycl::device get_pointer_device(Iter iter) {
return sycl::get_pointer_device(std::to_address(iter), shp::context());
for (auto&& device : shp::devices()) {
try {
return sycl::get_pointer_device(std::to_address(iter), __detail::queue(device).get_context());
} catch(...) {}
}
assert(false);
}

template <typename T> sycl::device get_pointer_device(shp::device_ptr<T> ptr) {
return sycl::get_pointer_device(ptr.get_raw_pointer(), shp::context());
for (auto&& device : shp::devices()) {
try {
return sycl::get_pointer_device(ptr.get_raw_pointer(), __detail::queue(device).get_context());
} catch(...) {}
}
assert(false);
}

template <typename InputIt> sycl::queue &get_queue_for_pointer(InputIt iter) {
if (get_pointer_type(iter) == sycl::usm::alloc::device) {
fmt::print("Get pointer device...\n");
auto device = get_pointer_device(iter);
fmt::print("Got device...\n");
return __detail::queue(device);
} else {
return default_queue();
Expand Down Expand Up @@ -73,7 +97,8 @@ inline sycl::event combine_events(sycl::queue &q,
}

inline sycl::event combine_events(const std::vector<sycl::event> &events) {
auto &&q = __detail::queue(0);
sycl::queue q(sycl::cpu_selector_v);
// auto &&q = __detail::queue(0);
return combine_events(q, events);
}

Expand Down
12 changes: 8 additions & 4 deletions include/dr/shp/distributed_vector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include <dr/shp/device_vector.hpp>
#include <dr/shp/vector.hpp>

#include <fmt/ranges.h>

namespace dr::shp {

template <typename T, typename L> class distributed_vector_accessor {
Expand Down Expand Up @@ -143,11 +145,13 @@ struct distributed_vector {
(count + dr::shp::devices().size() - 1) / dr::shp::devices().size();
capacity_ = segment_size_ * dr::shp::devices().size();

std::size_t rank = 0;
for (auto &&device : dr::shp::devices()) {
segments_.emplace_back(segment_type(
segment_size_, Allocator(dr::shp::context(), device), rank++));
fmt::print("Allocating segments...\n");
for (std::size_t rank = 0; rank < dr::shp::devices().size(); rank++) {
fmt::print("Segment {}...\n", rank);
segments_.emplace_back(
segment_type(segment_size_, Allocator(__detail::queue(rank)), rank));
}
fmt::print("Returning...\n");
}

distributed_vector(std::size_t count, const T &value)
Expand Down
Loading
Loading