Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revamp blocking_kind calculation #524

Merged
merged 1 commit into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions include/unifex/allocate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ namespace _alloc {

static constexpr bool sends_done = sender_traits<Sender>::sends_done;

static constexpr blocking_kind blocking = sender_traits<Sender>::blocking;

template(typename Self, typename Receiver)
(requires same_as<remove_cvref_t<Self>, type> AND
receiver<Receiver>)
Expand All @@ -106,8 +108,8 @@ namespace _alloc {
static_cast<Self&&>(s).sender_, (Receiver &&) r};
}

friend constexpr auto tag_invoke(tag_t<unifex::blocking>, const type& self) noexcept {
return blocking(self.sender_);
friend constexpr blocking_kind tag_invoke(tag_t<unifex::blocking>, const type& self) noexcept {
return unifex::blocking(self.sender_);
}

Sender sender_;
Expand Down
4 changes: 4 additions & 0 deletions include/unifex/async_manual_reset_event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#pragma once

#include <unifex/config.hpp>

#include <unifex/blocking.hpp>
#include <unifex/get_stop_token.hpp>
#include <unifex/receiver_concepts.hpp>
#include <unifex/scheduler_concepts.hpp>
Expand Down Expand Up @@ -54,6 +56,8 @@ struct _sender {

static constexpr bool sends_done = false;

static constexpr blocking_kind blocking = blocking_kind::never;

explicit _sender(async_manual_reset_event& evt) noexcept
: evt_(&evt) {}

Expand Down
3 changes: 3 additions & 0 deletions include/unifex/async_mutex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class async_mutex {

static constexpr bool sends_done = false;

// we complete inline if we manage to grab the lock immediately
static constexpr blocking_kind blocking = blocking_kind::maybe;

lock_sender(const lock_sender &) = delete;
lock_sender(lock_sender &&) = default;

Expand Down
6 changes: 2 additions & 4 deletions include/unifex/async_trace.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,12 @@ namespace _async_trace {

static constexpr bool sends_done = false;

static constexpr blocking_kind blocking = blocking_kind::always_inline;

template <typename Receiver>
operation<Receiver> connect(Receiver&& r) const& {
return operation<Receiver>{(Receiver &&) r};
}

friend auto tag_invoke(tag_t<blocking>, const sender&) noexcept {
return blocking_kind::always_inline;
}
};
} // namespace _async_trace
using async_trace_sender = _async_trace::sender;
Expand Down
3 changes: 3 additions & 0 deletions include/unifex/at_coroutine_exit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ struct _die_on_done {

static constexpr bool sends_done = false;

static constexpr blocking_kind blocking = sender_traits<Sender>::blocking;

template (typename Receiver)
(requires sender_to<Sender, _die_on_done_rec_t<Receiver>>)
auto connect(Receiver&& rec) &&
Expand Down Expand Up @@ -288,6 +290,7 @@ struct [[nodiscard]] _cleanup_task {
return std::move(std::exchange(continuation_, {}).promise().args_);
}

// TODO: how do we address always-inline awaitables
friend constexpr auto tag_invoke(tag_t<blocking>, const _cleanup_task&) noexcept {
return blocking_kind::always_inline;
}
Expand Down
79 changes: 42 additions & 37 deletions include/unifex/blocking.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,43 @@
namespace unifex {

namespace _block {
enum class _enum {
enum class _enum : unsigned char {
// Caller guarantees that the receiver will be called inline on the
// current thread that called .start() before .start() returns.
always_inline = 0,

// Guarantees that the receiver will be called strongly-happens-before
// .start() returns. Does not guarantee the call to the receiver happens
// on the same thread that called .start(), however.
always,

// No guarantees about the timing and context on which the receiver will
// be called.
maybe = 0,
maybe,

// Always completes asynchronously.
// Guarantees that the receiver will not be called on the current thread
// before .start() returns. The receiver may be called on another thread
// before .start() returns, however, or may be called on the current thread
// some time after .start() returns.
never,
};

// Guarantees that the receiver will be called strongly-happens-before
// .start() returns. Does not guarantee the call to the receiver happens
// on the same thread that called .start(), however.
always,
constexpr bool operator<(_enum lhs, _enum rhs) noexcept {
return static_cast<unsigned char>(lhs) < static_cast<unsigned char>(rhs);
}

// Caller guarantees that the receiver will be called inline on the
// current thread that called .start() before .start() returns.
always_inline
};
constexpr bool operator>(_enum lhs, _enum rhs) noexcept {
return rhs < lhs;
}

constexpr bool operator<=(_enum lhs, _enum rhs) noexcept {
return !(lhs > rhs);
}

constexpr bool operator>=(_enum lhs, _enum rhs) noexcept {
return !(lhs < rhs);
}

struct blocking_kind {
template <_enum Kind>
Expand All @@ -68,22 +84,23 @@ struct blocking_kind {
return value;
}

friend constexpr bool operator==(blocking_kind a, blocking_kind b) noexcept {
return a.value == b.value;
}

friend constexpr bool operator!=(blocking_kind a, blocking_kind b) noexcept {
return a.value != b.value;
}

static constexpr constant<_enum::maybe> maybe {};
static constexpr constant<_enum::never> never {};
static constexpr constant<_enum::always> always {};
static constexpr constant<_enum::always_inline> always_inline {};

_enum value{};
_enum value{_enum::maybe};
};

template <typename Sender, typename = void>
struct _has_blocking : std::false_type {};

template <typename Sender>
struct _has_blocking<
Sender,
std::void_t<decltype(Sender::blocking)>>
: std::true_type {};

struct _fn {
template(typename Sender)
(requires tag_invocable<_fn, const Sender&>)
Expand All @@ -95,30 +112,18 @@ struct _fn {
template(typename Sender)
(requires (!tag_invocable<_fn, const Sender&>))
constexpr auto operator()(const Sender&) const noexcept {
return blocking_kind::maybe;
if constexpr (_has_blocking<Sender>::value) {
return blocking_kind::constant<Sender::blocking>{};
}
else {
return blocking_kind::maybe;
}
}
};

namespace _cfn {
template <_enum Kind>
static constexpr auto _kind(blocking_kind::constant<Kind> kind) noexcept {
return kind;
}
static constexpr auto _kind(blocking_kind) noexcept {
return blocking_kind::maybe;
}

template <typename T>
constexpr auto cblocking() noexcept {
using blocking_t = remove_cvref_t<decltype(_fn{}(UNIFEX_DECLVAL(T&)))>;
return _cfn::_kind(blocking_t{});
}
}

} // namespace _block

inline constexpr _block::_fn blocking {};
using _block::_cfn::cblocking;
using _block::blocking_kind;

} // namespace unifex
Expand Down
6 changes: 6 additions & 0 deletions include/unifex/bulk_join.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class _join_sender<Source>::type {

static constexpr bool sends_done = sender_traits<Source>::sends_done;

static constexpr blocking_kind blocking = sender_traits<Source>::blocking;

template<typename Source2>
explicit type(Source2&& s)
noexcept(std::is_nothrow_constructible_v<Source, Source2>)
Expand All @@ -122,6 +124,10 @@ class _join_sender<Source>::type {
join_receiver<remove_cvref_t<Receiver>>{static_cast<Receiver&&>(r)});
}

friend constexpr blocking_kind tag_invoke(tag_t<unifex::blocking>, const type& s) noexcept {
return unifex::blocking(s.source_);
}

private:
Source source_;
};
Expand Down
6 changes: 6 additions & 0 deletions include/unifex/bulk_schedule.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ class _default_sender<Scheduler, Integral>::type {

static constexpr bool sends_done = true;

static constexpr blocking_kind blocking = sender_traits<schedule_sender_t>::blocking;

template<typename Scheduler2>
explicit type(Scheduler2&& s, Integral count)
: scheduler_(static_cast<Scheduler2&&>(s))
Expand All @@ -185,6 +187,10 @@ class _default_sender<Scheduler, Integral>::type {
static_cast<BulkReceiver&&>(r)});
}

friend blocking_kind tag_invoke(tag_t<blocking>, const type& self) noexcept {
return unifex::blocking(self.scheduler_);
}

private:
Scheduler scheduler_;
Integral count_;
Expand Down
6 changes: 6 additions & 0 deletions include/unifex/bulk_transform.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ class _tfx_sender<Source, Func, Policy>::type {

static constexpr bool sends_done = sender_traits<Source>::sends_done;

static constexpr blocking_kind blocking = sender_traits<Source>::blocking;

template<typename Source2, typename Func2>
explicit type(Source2&& source, Func2&& func, Policy policy)
: source_((Source2&&)source)
Expand Down Expand Up @@ -189,6 +191,10 @@ class _tfx_sender<Source, Func, Policy>::type {
static_cast<Receiver&&>(r)});
}

friend constexpr blocking_kind tag_invoke(tag_t<unifex::blocking>, const type& s) noexcept {
return unifex::blocking(s.source_);
}

private:
UNIFEX_NO_UNIQUE_ADDRESS Source source_;
UNIFEX_NO_UNIQUE_ADDRESS Func func_;
Expand Down
2 changes: 2 additions & 0 deletions include/unifex/connect_awaitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ namespace _as_sender {
return unifex::connect_awaitable(((type&&) t).awaitable_, (Receiver&&) r);
}

// TODO: how do we make this property statically discoverable?
friend constexpr auto tag_invoke(tag_t<unifex::blocking>, const type& t) noexcept {
return unifex::blocking(t.awaitable_);
}
Expand Down Expand Up @@ -267,6 +268,7 @@ namespace _as_sender {
return unifex::connect_awaitable(((type&&) t).awaitable_, (Receiver&&) r);
}

// TODO: how do we make this property statically discoverable?
friend constexpr auto tag_invoke(tag_t<unifex::blocking>, const type& t) noexcept {
return unifex::blocking(t.awaitable_);
}
Expand Down
6 changes: 5 additions & 1 deletion include/unifex/create.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ struct _snd_base {

static constexpr bool sends_done = true;

// there's no way to know; maybe create() should have a way for the user to
// specify
static constexpr blocking_kind blocking = blocking_kind::maybe;

template (typename Self, typename Receiver)
(requires derived_from<remove_cvref_t<Self>, type> AND
constructible_from<Fn, member_t<Self, Fn>> AND
Expand Down Expand Up @@ -157,7 +161,7 @@ struct _fn {
* \fn template <class... ValueTypes> auto create(auto fn [, auto ctx])
* \brief A utility for building a sender-based API out of a C-style API that
* accepts a void* context and a function pointer continuation.
*
*
* \em Example:
* \code
* // A void-returning C-style async API that accepts a context and a continuation:
Expand Down
7 changes: 5 additions & 2 deletions include/unifex/dematerialize.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ namespace _demat {

static constexpr bool sends_done = sender_traits<Source>::sends_done;

static constexpr blocking_kind blocking = sender_traits<Source>::blocking;

template <typename Source2>
explicit type(Source2&& source)
noexcept(std::is_nothrow_constructible_v<Source, Source2>)
Expand All @@ -180,8 +182,9 @@ namespace _demat {
receiver_t<Receiver>{static_cast<Receiver&&>(r)});
}

friend constexpr auto tag_invoke(tag_t<unifex::blocking>, const type& self) noexcept {
return blocking(self.source_);
friend constexpr blocking_kind
tag_invoke(tag_t<unifex::blocking>, const type& self) noexcept {
return unifex::blocking(self.source_);
}
private:
Source source_;
Expand Down
27 changes: 12 additions & 15 deletions include/unifex/detach_on_cancel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <unifex/receiver_concepts.hpp>
#include <unifex/sender_concepts.hpp>
#include <unifex/type_list.hpp>

#include <algorithm>
#include <atomic>
#include <exception>

Expand Down Expand Up @@ -204,21 +206,16 @@ struct _sender<Sender>::type {

static constexpr bool sends_done = true;

friend constexpr auto tag_invoke(tag_t<blocking>, const type& sender) noexcept {
if constexpr (same_as<blocking_kind,
decltype(blocking(sender.upstreamSender_))>) {
// the sender returns a runtime-determined blocking_kind
blocking_kind blockValue = blocking(sender.upstreamSender_);
if (blockValue == blocking_kind::never) {
blockValue = blocking_kind::maybe;
}
return blockValue;
} else if constexpr (blocking_kind::never == cblocking<Sender>()) {
// the sender always returns never
return blocking_kind::maybe;
} else {
return cblocking<Sender>();
}
// We will complete inline if started with a stop token that has had stop
// requested. If Sender is maybe or never then we're maybe overall; if Sender is
// always then we can report always; otherwise Sender is always_inline and we
// can report that.
static constexpr blocking_kind blocking =
std::min(blocking_kind::maybe(), sender_traits<Sender>::blocking());

friend constexpr blocking_kind tag_invoke(tag_t<blocking>, const type& sender) noexcept {
blocking_kind other{blocking(sender)};
return std::min(blocking_kind::maybe(), other());
}

template <typename This, typename Receiver>
Expand Down
Loading