Skip to content

Commit

Permalink
kafka cpp server: improvements (#1765)
Browse files Browse the repository at this point in the history
- using an io_context-per-CPU design, the default size is 1
- add support for resolving bind address
  • Loading branch information
4eUeP authored Feb 27, 2024
1 parent 16657b8 commit ed63d26
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 35 deletions.
16 changes: 10 additions & 6 deletions hstream-kafka/HStream/Kafka/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,19 @@ data ServerOptions = ServerOptions
, serverSslOptions :: !(Maybe SslOptions)
, serverSaslOptions :: !(Maybe SaslOptions)
, serverOnStarted :: !(Maybe (IO ()))
, ioContextPoolSize :: !Word64
-- ^ The number of io_contexts in the pool. The default value is 1.
-- Only for c++ server.
}

defaultServerOpts :: ServerOptions
defaultServerOpts = ServerOptions
{ serverHost = "0.0.0.0"
, serverPort = 9092
, serverSslOptions = Nothing
, serverSaslOptions = Nothing
, serverOnStarted = Nothing
{ serverHost = "0.0.0.0"
, serverPort = 9092
, serverSslOptions = Nothing
, serverSaslOptions = Nothing
, serverOnStarted = Nothing
, ioContextPoolSize = 1
}

-------------------------------------------------------------------------------
Expand Down Expand Up @@ -211,7 +215,7 @@ runCppServer opts sc_ mkAuthedHandlers =
-- 'io_context' inside it will be invalid. And there are potential
-- haskell threads that are still using it. For example, the
-- 'Cxx.release_lock' in 'processorCallback'. Which will cause a crash.
server <- Cxx.new_kafka_server
server <- Cxx.new_kafka_server (fromIntegral opts.ioContextPoolSize)
let start = Cxx.run_kafka_server server hostPtr opts.serverPort
cb connCb cfdOnStarted
stop a = Cxx.stop_kafka_server server >> Async.wait a
Expand Down
4 changes: 3 additions & 1 deletion hstream-kafka/HStream/Kafka/Network/Cxx.hsc
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ withConnContextCallback cb =
data CppKafkaServer

foreign import ccall unsafe "new_kafka_server"
new_kafka_server :: IO (Ptr CppKafkaServer)
new_kafka_server
:: CSize -- ^ size of io_context_pool
-> IO (Ptr CppKafkaServer)

foreign import ccall unsafe "delete_kafka_server"
delete_kafka_server :: Ptr CppKafkaServer -> IO ()
Expand Down
149 changes: 121 additions & 28 deletions hstream-kafka/cbits/hs_kafka_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@
#include <asio/use_awaitable.hpp>
#include <asio/write.hpp>
#include <iostream>
#include <list>
#include <thread>

#include "hs_kafka_server.h"

// ----------------------------------------------------------------------------

static void writeBE(int32_t value, uint8_t bytes[4]) {
bytes[0] = (value >> 24) & 0xFF;
bytes[1] = (value >> 16) & 0xFF;
Expand All @@ -25,6 +29,96 @@ static int32_t readBE(uint8_t bytes[4]) {
return (bytes[0] << 24) | (bytes[1] << 16) | (bytes[2] << 8) | bytes[3];
}

void hs_event_notify(int& fd) {
if (fd == -1)
return;

uint64_t u = 1;
ssize_t s = write(fd, &u, sizeof(uint64_t));
if (s != sizeof(uint64_t)) {
fprintf(stderr, "write to fd %d failed!", fd);
return;
}
}

// Copy from asio example:
//
// https://think-async.com/Asio/asio-1.28.0/doc/asio/examples/cpp03_examples.html#asio.examples.cpp03_examples.http_server_2
//
// Using an io_context-per-CPU design.
//
// TODO: How about "using a single io_context and a thread pool calling
// io_context::run()" as this example does:
//
// https://think-async.com/Asio/asio-1.28.0/doc/asio/examples/cpp03_examples.html#asio.examples.cpp03_examples.http_server_3
class io_context_pool {
public:
/// Construct the io_context pool.
explicit io_context_pool(std::size_t pool_size) : next_io_context_(0) {
if (pool_size == 0)
throw std::runtime_error("io_context_pool size is 0");

// Give all the io_contexts work to do so that their run() functions will
// not exit until they are explicitly stopped.
for (std::size_t i = 0; i < pool_size; ++i) {
// FIXME: does it make sense to use concurrency_hint = 1?
//
// https://think-async.com/Asio/asio-1.28.0/doc/asio/overview/core/concurrency_hint.html
io_context_ptr io_context(new asio::io_context(1));
io_contexts_.push_back(io_context);
work_.push_back(asio::make_work_guard(*io_context));
}
}

/// Run all io_context objects in the pool.
void run() {
// Create a pool of threads to run all of the io_contexts.
std::vector<std::thread> threads;
for (std::size_t i = 0; i < io_contexts_.size(); ++i)
threads.emplace_back([this, i] { io_contexts_[i]->run(); });

// Wait for all threads in the pool to exit.
for (std::size_t i = 0; i < threads.size(); ++i)
threads[i].join();
}

/// Stop all io_context objects in the pool.
void stop() {
// Explicitly stop all io_contexts.
for (std::size_t i = 0; i < io_contexts_.size(); ++i)
io_contexts_[i]->stop();
}

/// Get an io_context to use.
asio::io_context& get_io_context() {
// Use a round-robin scheme to choose the next io_context to use.
asio::io_context& io_context = *io_contexts_[next_io_context_];
++next_io_context_;
if (next_io_context_ == io_contexts_.size())
next_io_context_ = 0;
return io_context;
}

private:
io_context_pool(const io_context_pool&) = delete;
io_context_pool& operator=(const io_context_pool&) = delete;

typedef std::shared_ptr<asio::io_context> io_context_ptr;
typedef asio::executor_work_guard<asio::io_context::executor_type>
io_context_work;

/// The pool of io_contexts.
std::vector<io_context_ptr> io_contexts_;

/// The work that keeps the io_contexts running.
std::list<io_context_work> work_;

/// The next io_context to use for a connection.
std::size_t next_io_context_;
};

// ----------------------------------------------------------------------------

class ServerHandler : public std::enable_shared_from_this<ServerHandler> {
public:
ServerHandler(asio::ip::tcp::socket socket, HsCallback& callback,
Expand Down Expand Up @@ -122,22 +216,12 @@ class ServerHandler : public std::enable_shared_from_this<ServerHandler> {
HsCallback& callback_;
};

void hs_event_notify(int& fd) {
if (fd == -1)
return;

uint64_t u = 1;
ssize_t s = write(fd, &u, sizeof(uint64_t));
if (s != sizeof(uint64_t)) {
fprintf(stderr, "write to fd %d failed!", fd);
return;
}
}

asio::awaitable<void> listener(asio::ip::tcp::acceptor acceptor,
io_context_pool& context_pool,
HsCallback callback, HsNewStablePtr newConnCtx) {
for (;;) {
auto socket = co_await acceptor.async_accept(asio::use_awaitable);
auto socket = co_await acceptor.async_accept(context_pool.get_io_context(),
asio::use_awaitable);
auto peer_host = socket.remote_endpoint().address().to_string();
conn_context_t conn_ctx{peer_host.data(), peer_host.size()};
auto sp = newConnCtx(&conn_ctx);
Expand All @@ -150,27 +234,36 @@ asio::awaitable<void> listener(asio::ip::tcp::acceptor acceptor,
extern "C" {

struct Server {
asio::io_context io_context{1};
io_context_pool io_context_pool_;

explicit Server(std::size_t io_context_pool_size)
: io_context_pool_(io_context_pool_size) {}
};

Server* new_kafka_server() { return new Server(); }
Server* new_kafka_server(std::size_t io_context_pool_size) {
return new Server(io_context_pool_size);
}

void run_kafka_server(Server* server, const char* host, uint16_t port,
HsCallback callback, HsNewStablePtr newConnCtx,
int fd_on_started) {
// Create an address from an IPv4 address string in dotted decimal form, or
// from an IPv6 address in hexadecimal notation.
//
// FIXME: what if the host is a domain? e.g. 'localhost'
auto addr = asio::ip::make_address(host);
auto& context_pool = server->io_context_pool_;

asio::ip::tcp::acceptor acceptor(context_pool.get_io_context());
asio::ip::tcp::resolver resolver(acceptor.get_executor());
asio::ip::tcp::endpoint endpoint =
*resolver.resolve(std::string(host), std::to_string(port)).begin();
free((void*)host);
auto& io_context = server->io_context;

asio::co_spawn(io_context,
listener(asio::ip::tcp::acceptor(io_context, {addr, port},
true /*reuse_addr*/),
callback, newConnCtx),
asio::detached);
acceptor.open(endpoint.protocol());
acceptor.set_option(asio::ip::tcp::acceptor::reuse_address(true));
acceptor.bind(endpoint);
acceptor.listen();

asio::co_spawn(
context_pool.get_io_context(),
listener(std::move(acceptor), context_pool, callback, newConnCtx),
asio::detached);

// FIXME: Do we need to handle SIGINT and SIGTERM?
//
Expand All @@ -179,10 +272,10 @@ void run_kafka_server(Server* server, const char* host, uint16_t port,

hs_event_notify(fd_on_started);

io_context.run();
context_pool.run();
}

void stop_kafka_server(Server* server) { server->io_context.stop(); }
void stop_kafka_server(Server* server) { server->io_context_pool_.stop(); }

void delete_kafka_server(Server* server) { delete server; }

Expand Down

0 comments on commit ed63d26

Please sign in to comment.