#include <asiochan/asiochan.hpp>
using namespace asiochan;
This library provides golang-inspired channel types to be used with ASIO awaitable
coroutines.
Channels allow bidirectional message passing and synchronization between coroutines.
Both standalone and boost versions of ASIO are supported.
See the installing section on how to install and select the ASIO distribution used.
auto sum_subtask(
read_channel<std::optional<int>> in,
write_channel<int> out)
-> asio::awaitable<void>
{
auto sum = 0;
while (auto value = co_await in.read())
{
sum += *value;
}
co_await out.write(sum);
}
auto sum_task(std::span<int const> array, int num_tasks)
-> asio::awaitable<int>
{
auto executor = co_await asio::this_coro::executor;
// Spawn N child routines, sharing the same in/out channels
auto in = channel<std::optional<int>>{};
auto out = channel<int>{};
for (auto i : std::views::iota(0, num_tasks))
{
asio::co_spawn(executor, sum_subtask(in, out), asio::detached);
}
// Send the array to the child routines
for (auto val : array)
{
co_await in.write(val);
}
for (auto i : std::views::iota(0, num_tasks - 1))
{
// Join a task
co_await in.write(std::nullopt);
// Retrieve its result
auto subresult = co_await out.read();
// Send it to another task
co_await in.write(subresult);
}
// Join the last task
co_await in.write(std::nullopt);
// Retrieve the complete result
co_return co_await out.read();
}
auto main() -> int
{
auto tp = asio::thread_pool{};
auto numbers = std::vector<int>(100);
std::iota(numbers.begin(), numbers.end(), 1);
auto task = asio::co_spawn(tp, sum_task(numbers, 10), asio::use_future);
std::cout << "The result is " << task.get();
return 0;
}
- Thread safety - all channel types are thread-safe.
- Value semantics - channels are intended to be passed by value. Internally, a channel holds a
shared_ptr
to a shared state type, similar tofuture
andpromise
. - Bidirectional - channels are bidirectional by default, but can be restricted to write or read only (similar to channels in golang).
- Synchronization - by default, a writer will wait until someone reads the value. Readers and writers are queued in FIFO order. Similar to golang channels, it is possible to specify a buffer size; writing is wait-free as long as there is space in the buffer. A dynamically sized buffer that is always wait-free for the writer is also available.
- Channels of
void
- channels that do not send any values and are used only for synchronization are also possible. When buffered, the buffer is implemented as a simple counter (and does not allocate even when dynamically sized). - It is possible to simultaneously await multiple alternative read / write channel operations, similar to go's
select
statement, see select. This allows e.g. for easy implementation of cancellation / timeouts.
#include <asiochan/sendable.hpp>
template <typename T>
concept sendable;
The sendable
concept defines the requirements for types that can be sent via channels. It is satisfied by all nothrow-movable types, and void
.
#include <asiochan/channel.hpp>
template <sendable T, channel_buff_size buff_size, asio::execution::executor Executor>
class basic_channel;
template <sendable T, channel_buff_size buff_size, asio::execution::executor Executor>
class basic_read_channel;
template <sendable T, channel_buff_size buff_size, asio::execution::executor Executor>
class basic_write_channel;
Bidirectional channels can be converted to matching read and write channel types as long as the value type, buffer size, and executor match. Read and write channels are not interconvertible, to preserve type-safety. buff_size
(size_t
) specifies the size of the internal buffer. When 0, the writer will always wait for a read. A special value unbounded_channel_buff
can be used, in which case the buffer is dynamic and writers never wait.
template <sendable T, channel_buff_size buff_size_ = 0>
using channel = basic_channel<T, buff_size_, asio::any_io_executor>;
template <sendable T, channel_buff_size buff_size_ = 0>
using read_channel = basic_read_channel<T, buff_size_, asio::any_io_executor>;
template <sendable T, channel_buff_size buff_size_ = 0>
using write_channel = basic_write_channel<T, buff_size_, asio::any_io_executor>;
template <sendable T>
using unbounded_channel = channel<T, unbounded_channel_buff>;
template <sendable T>
using unbounded_read_channel = read_channel<T, unbounded_channel_buff>;
template <sendable T>
using unbounded_write_channel = write_channel<T, unbounded_channel_buff>;
channel<void> chan1{}; // Default constructor - creates a new shared state
auto chan2 = chan1; // Copy constructor - now shares state with chan1
auto chan3 = std::move(chan); // Move constructor - chan1 is now invalid.
channel<int> chan{};
std::optional<int> maybe_result = chan.try_read();
int result = co_await chan.read();
channel<void> chan_void{};
bool success = chan_void.try_read();
co_await chan_void.read();
The try_read
method does not perform any waiting. If no value is available, nullopt
(or false
for channel<void>
) is returned.
The read
method will wait until a value is ready.
bool success = chan.try_write(1);
co_await chan.write(1);
bool success = chan_void.try_write();
co_await chan_void.write();
The try_write
method do not perform any waiting. If no waiter was ready and the internal buffer was full, false
is returned.
The write
method will wait until a reader is ready.
Note that for unbounded buffered channels, writing always succeeds and is without wait. To reflect this fact, the try_write
method is not available, and write
can be called without co_await
.
#include <asiochan/select.hpp>
The select
function allows awaiting on multiple alternative channel operations. The first ready operation will cancel all others. Cancellation is fully deterministic. For example, when you await reads on two different channels, only one of these will have a value consumed.
channel<void> chan_void_1{};
channel<void> chan_void_2{};
channel<int> chan_int_1{};
channel<int> chan_int_2{};
auto result = co_await select(
ops::read(chan_void_1),
ops::write(chan_void_2),
ops::read(chan_int_1),
ops::write(std::rand(), chan_int_2));
bool received_void = result.received<void>();
bool sent_void = result.sent<void>();
// Non-owning pointer inside the result object if int was received, nullptr otherwise.
int* maybe_received_int = result.get_if_received<int>();
bool sent_int = result.sent<int>();
if (result.received<int>())
{
// The get_received<T> method will throw bad_select_result_access if you get the type wrong.
int received_int = result.get_received<int>();
}
If you don't want to wait until some operation becomes ready, you can use the wait-free function select_ready
. It must be passed some default wait-free operation as the last argument. An example of a wait-free operation is nothing
:
auto result = select_ready(
ops::read(chan_int_1),
ops::write(std::rand(), chan_int_2),
ops::nothing);
// If nothing is an alternative, has_value() method is available...
bool any_succeeded = result.has_value();
// .. and the result is contextually convertible to bool.
if (result)
{
// ...
}
Writing to an unbounded channel is also a wait-free operation, and can be thus be used as the default operation for select_ready
.
The read
and write
operations can accept multiple channels.
This allows you to select
between multiple write channels with the same send_type
without copying the sent value:
channel<std::string> chan_1{};
channel<std::string> chan_2{};
std::string long_string = "...";
auto string_send_result = co_await select(
ops::write(std::move(long_string), chan_1, chan_2));
The select_result
type remembers the shared state of the channel for which an operation succeeded. This allows disambiguation between channels of the same send_type
:
bool sent_to_chan_1 = string_send_result.sent_to(chan_1);
bool sent_to_chan_2 = string_send_result.sent_to(chan_2);
auto string_recv_result = co_await select(
ops::read(chan_1, chan_2));
bool recv_from_chan_1 = string_recv_result.received_from(chan_1);
bool recv_from_chan_2 = string_recv_result.received_from(chan_2);
// Similar to get_if<T>()
std::string* result = string_recv_result.get_if_received_from(chan_1);
The select feature can be useful for implementing timeouts on channel operations.
using std::chrono::steady_clock;
using duration = steady_clock::duration;
using namespace std::literals;
auto set_timeout(
asio::execution::executor auto executor,
duration dur)
-> read_channel<void>
{
auto timer = asio::steady_timer{executor};
timer.expires_after(dur);
auto timeout = channel<void>{};
asio::co_spawn(
executor,
[=]() -> asio::awaitable<void> {
co_await timer.async_wait(asio::use_awaitable);
co_await timeout.write();
},
asio::detached);
return timeout;
}
auto accept_client_requests(
write_channel<std::string> requests_channel)
-> asio::awaitable<void>
{
while (true)
{
auto request_from_client = co_await /* ... */;
co_await requests_channel.write(std::move(request_from_client));
}
}
auto timeout_example()
-> asio::awaitable<void>
{
auto executor = co_await asio::this_coro::executor;
auto requests = channel<std::string>{};
asio::co_spawn(
executor,
accept_client_requests(requests_channel),
asio::detached);
auto timeout = set_timeout(executor, 10s);
auto result = co_await select(
ops::read(requests),
ops::read(timeout));
if (auto* request = result.get_if_received<std::string>())
{
// Handle request...
}
else
{
// Handle timeout...
}
}
By default, Boost.ASIO is used. To use with standalone ASIO:
- When consuming as a Conan package - set the option
asio=standalone
- When consuming as a CMake subproject - set the cache variable
ASIOCHAN_USE_STANDALONE_ASIO=ON
- When consuming as headers - define the
ASIOCHAN_USE_STANDALONE_ASIO
macro
If you use Conan to manage dependencies, you can install the library by cloning the repository, and running conan create <path-to-repo>
.