Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a type-erased response adapter to the public API #201

Open
wants to merge 15 commits into
base: develop
Choose a base branch
from
80 changes: 80 additions & 0 deletions include/boost/redis/adapter/any_adapter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/* Copyright (c) 2018-2023 Marcelo Zimbres Silva ([email protected])
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/

#ifndef BOOST_REDIS_ANY_ADAPTER_HPP
#define BOOST_REDIS_ANY_ADAPTER_HPP


#include <boost/redis/resp3/node.hpp>
#include <boost/redis/adapter/adapt.hpp>
#include <boost/system/error_code.hpp>
#include <cstddef>
#include <functional>
#include <string_view>
#include <type_traits>

namespace boost::redis {

namespace detail {

// Forward decl
template <class Executor>
class connection_base;

}

/** @brief A type-erased reference to a response.
* @ingroup high-level-api
*
* A type-erased response adapter. It can be executed using @ref connection::async_exec.
* Using this type instead of raw response references enables separate compilation.
*
* Given a response object `resp` that can be passed to `async_exec`, the following two
* statements have the same effect:
* ```
* co_await conn.async_exec(req, resp);
* co_await conn.async_exec(req, any_response(resp));
* ```
*/
class any_adapter
{
using fn_type = std::function<void(std::size_t, resp3::basic_node<std::string_view> const&, system::error_code&)>;

struct impl_t {
fn_type adapt_fn;
std::size_t supported_response_size;
} impl_;

template <class T>
static auto create_impl(T& resp) -> impl_t
{
using namespace boost::redis::adapter;
auto adapter = boost_redis_adapt(resp);
std::size_t size = adapter.get_supported_response_size();
return { std::move(adapter), size };
}

template <class Executor>
friend class detail::connection_base;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there should be no friend class here, the class should define

void operator(std::size_t, resp3::basic_node<std::string_view> const&, system::error_code&)

and forward the call directly to impl_. Once you do that you can also remove this typedef and replace it with any_adapter AFAICS.

In a later issue we will have to think about how this relates to the receiver adapter or whether we need another any adapter class. This will be needed to type erase the receive operation too

Copy link
Collaborator Author

@anarthal anarthal Aug 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense only if any_adapter is a public class - otherwise I can remove it (together with the Doxygen docs). See my question above.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let us keep it public.


public:
/**
* @brief Constructor.
*
* Creates a type-erased response adapter from `resp` by calling
* `boost_redis_adapt`. `T` must be a valid Redis response type.
* Any type passed to @ref connection::async_exec qualifies.
*
* This object stores a reference to `resp`, which must be kept alive
* while `*this` is being used.
*/
template <class T, class = std::enable_if_t<!std::is_same_v<T, any_adapter>>>
explicit any_adapter(T& resp) : impl_(create_impl(resp)) {}
};

}

#endif
42 changes: 38 additions & 4 deletions include/boost/redis/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#ifndef BOOST_REDIS_CONNECTION_HPP
#define BOOST_REDIS_CONNECTION_HPP

#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/detail/connection_base.hpp>
#include <boost/redis/logger.hpp>
#include <boost/redis/config.hpp>
Expand All @@ -17,7 +18,7 @@
#include <boost/asio/any_completion_handler.hpp>

#include <chrono>
#include <memory>
#include <cstddef>
#include <limits>

namespace boost::redis {
Expand Down Expand Up @@ -256,7 +257,22 @@ class basic_connection {
Response& resp = ignore,
CompletionToken&& token = CompletionToken{})
{
return impl_.async_exec(req, resp, std::forward<CompletionToken>(token));
return impl_.async_exec(req, any_adapter(resp), std::forward<CompletionToken>(token));
}

/** @copydoc async_exec
*
* @details This function uses the type-erased @ref any_adapter class, which
* encapsulates a reference to a response object.
*/
template <class CompletionToken = asio::default_completion_token_t<executor_type>>
auto
async_exec(
request const& req,
any_adapter adapter,
CompletionToken&& token = CompletionToken{})
{
return impl_.async_exec(req, std::move(adapter), std::forward<CompletionToken>(token));
}

/** @brief Cancel operations.
Expand Down Expand Up @@ -392,9 +408,21 @@ class connection {

/// Calls `boost::redis::basic_connection::async_exec`.
template <class Response, class CompletionToken>
auto async_exec(request const& req, Response& resp, CompletionToken token)
auto async_exec(request const& req, Response& resp, CompletionToken&& token)
{
return impl_.async_exec(req, resp, std::move(token));
return async_exec(req, any_adapter(resp), std::forward<CompletionToken>(token));
}

/// Calls `boost::redis::basic_connection::async_exec`.
template <class CompletionToken>
auto async_exec(request const& req, any_adapter adapter, CompletionToken&& token)
mzimbres marked this conversation as resolved.
Show resolved Hide resolved
{
return asio::async_initiate<
CompletionToken, void(boost::system::error_code, std::size_t)>(
[](auto handler, connection* self, request const* req, any_adapter&& adapter)
{
self->async_exec_impl(*req, std::move(adapter), std::move(handler));
}, token, this, &req, std::move(adapter));
}

/// Calls `boost::redis::basic_connection::cancel`.
Expand Down Expand Up @@ -435,6 +463,12 @@ class connection {
config const& cfg,
logger l,
asio::any_completion_handler<void(boost::system::error_code)> token);

void
async_exec_impl(
request const& req,
any_adapter&& adapter,
asio::any_completion_handler<void(boost::system::error_code, std::size_t)> token);

basic_connection<executor_type> impl_;
};
Expand Down
18 changes: 11 additions & 7 deletions include/boost/redis/detail/connection_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#ifndef BOOST_REDIS_CONNECTION_BASE_HPP
#define BOOST_REDIS_CONNECTION_BASE_HPP

#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/adapter/adapt.hpp>
#include <boost/redis/detail/helper.hpp>
#include <boost/redis/error.hpp>
Expand All @@ -30,6 +31,7 @@
#include <boost/asio/read_until.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/associated_immediate_executor.hpp>

#include <algorithm>
#include <array>
Expand All @@ -39,6 +41,7 @@
#include <string_view>
#include <type_traits>
#include <functional>
#include <utility>

namespace boost::redis::detail
{
Expand Down Expand Up @@ -121,7 +124,9 @@ struct exec_op {
// be stablished.
if (info_->req_->get_config().cancel_if_not_connected && !conn_->is_open()) {
BOOST_ASIO_CORO_YIELD
asio::post(std::move(self));
asio::dispatch(
asio::get_associated_immediate_executor(self, self.get_io_executor()),
std::move(self));
return self.complete(error::not_connected, 0);
}

Expand Down Expand Up @@ -440,14 +445,13 @@ class connection_base {
cancel_impl(op);
}

template <class Response, class CompletionToken>
auto async_exec(request const& req, Response& resp, CompletionToken token)
template <class CompletionToken>
auto async_exec(request const& req, any_adapter&& adapter, CompletionToken&& token)
{
using namespace boost::redis::adapter;
auto f = boost_redis_adapt(resp);
BOOST_ASSERT_MSG(req.get_expected_responses() <= f.get_supported_response_size(), "Request and response have incompatible sizes.");
auto& adapter_impl = adapter.impl_;
BOOST_ASSERT_MSG(req.get_expected_responses() <= adapter_impl.supported_response_size, "Request and response have incompatible sizes.");

auto info = std::make_shared<req_info>(req, f, get_executor());
auto info = std::make_shared<req_info>(req, std::move(adapter_impl.adapt_fn), get_executor());

return asio::async_compose
< CompletionToken
Expand Down
3 changes: 2 additions & 1 deletion include/boost/redis/detail/health_checker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <boost/redis/request.hpp>
#include <boost/redis/response.hpp>
#include <boost/redis/operation.hpp>
#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/detail/helper.hpp>
#include <boost/redis/config.hpp>
#include <boost/asio/steady_timer.hpp>
Expand Down Expand Up @@ -44,7 +45,7 @@ class ping_op {
}

BOOST_ASIO_CORO_YIELD
conn_->async_exec(checker_->req_, checker_->resp_, std::move(self));
conn_->async_exec(checker_->req_, any_adapter(checker_->resp_), std::move(self));
if (ec || is_cancelled(self)) {
logger_.trace("ping_op: error/cancelled (1).");
checker_->wait_timer_.cancel();
Expand Down
3 changes: 2 additions & 1 deletion include/boost/redis/detail/runner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#ifndef BOOST_REDIS_RUNNER_HPP
#define BOOST_REDIS_RUNNER_HPP

#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/detail/health_checker.hpp>
#include <boost/redis/config.hpp>
#include <boost/redis/response.hpp>
Expand Down Expand Up @@ -47,7 +48,7 @@ struct hello_op {
runner_->add_hello();

BOOST_ASIO_CORO_YIELD
conn_->async_exec(runner_->hello_req_, runner_->hello_resp_, std::move(self));
conn_->async_exec(runner_->hello_req_, any_adapter(runner_->hello_resp_), std::move(self));
logger_.on_hello(ec, runner_->hello_resp_);

if (ec || runner_->has_error_in_response() || is_cancelled(self)) {
Expand Down
10 changes: 10 additions & 0 deletions include/boost/redis/impl/connection.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

#include <boost/redis/connection.hpp>
#include <cstddef>

namespace boost::redis {

Expand All @@ -31,6 +32,15 @@ connection::async_run_impl(
impl_.async_run(cfg, l, std::move(token));
}

void
connection::async_exec_impl(
request const& req,
any_adapter&& adapter,
asio::any_completion_handler<void(boost::system::error_code, std::size_t)> token)
{
impl_.async_exec(req, std::move(adapter), std::move(token));
}

void connection::cancel(operation op)
{
impl_.cancel(op);
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ make_test(test_conn_exec_cancel 20)
make_test(test_conn_exec_cancel2 20)
make_test(test_conn_echo_stress 20)
make_test(test_conn_run_cancel 20)
make_test(test_any_adapter 17)
make_test(test_issue_50 20)
make_test(test_issue_181 17)

Expand Down
1 change: 1 addition & 0 deletions test/Jamfile
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ local tests =
test_low_level
test_request
test_run
test_any_adapter
;

# Build and run the tests
Expand Down
49 changes: 49 additions & 0 deletions test/test_any_adapter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva ([email protected])
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/

#include <boost/redis/ignore.hpp>
#include <boost/redis/response.hpp>
#include <boost/redis/adapter/any_adapter.hpp>
#include <string>
#define BOOST_TEST_MODULE any_adapter
#include <boost/test/included/unit_test.hpp>

using boost::redis::generic_response;
using boost::redis::response;
using boost::redis::ignore;
using boost::redis::any_adapter;

BOOST_AUTO_TEST_CASE(any_adapter_response_types)
{
// any_adapter can be used with any supported responses
response<int> r1;
response<int, std::string> r2;
generic_response r3;

BOOST_CHECK_NO_THROW(any_adapter{r1});
BOOST_CHECK_NO_THROW(any_adapter{r2});
BOOST_CHECK_NO_THROW(any_adapter{r3});
BOOST_CHECK_NO_THROW(any_adapter{ignore});
}

BOOST_AUTO_TEST_CASE(any_adapter_copy_move)
{
// any_adapter can be copied/moved
response<int, std::string> r;
any_adapter ad1 {r};

// copy constructor
any_adapter ad2 {ad1};

// move constructor
any_adapter ad3 {std::move(ad2)};

// copy assignment
BOOST_CHECK_NO_THROW(ad2 = ad1);

// move assignment
BOOST_CHECK_NO_THROW(ad2 = std::move(ad1));
}
24 changes: 24 additions & 0 deletions test/test_conn_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
* accompanying file LICENSE.txt)
*/

#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/connection.hpp>
#include <boost/system/errc.hpp>
#include <boost/asio/detached.hpp>
#include <string>
#define BOOST_TEST_MODULE conn-exec
#include <boost/test/included/unit_test.hpp>
#include <iostream>
Expand Down Expand Up @@ -188,3 +190,25 @@ BOOST_AUTO_TEST_CASE(large_number_of_concurrent_requests_issue_170)
BOOST_CHECK_EQUAL(counter, repeat);
}

BOOST_AUTO_TEST_CASE(exec_any_response)
{
// Executing an any_response object works
request req;
req.push("PING", "PONG");
response<std::string> res;

net::io_context ioc;

auto conn = std::make_shared<connection>(ioc);

conn->async_exec(req, boost::redis::any_adapter(res), [&](auto ec, auto){
BOOST_TEST(!ec);
conn->cancel();
});

run(conn);
ioc.run();

BOOST_TEST(std::get<0>(res).value() == "PONG");
}