diff --git a/hstream-kafka/HStream/Kafka/Network.hs b/hstream-kafka/HStream/Kafka/Network.hs index 6258e940c..b9f1542e4 100644 --- a/hstream-kafka/HStream/Kafka/Network.hs +++ b/hstream-kafka/HStream/Kafka/Network.hs @@ -70,15 +70,19 @@ data ServerOptions = ServerOptions , serverSslOptions :: !(Maybe SslOptions) , serverSaslOptions :: !(Maybe SaslOptions) , serverOnStarted :: !(Maybe (IO ())) + , ioContextPoolSize :: !Word64 + -- ^ The number of io_contexts in the pool. The default value is 1. + -- Only for c++ server. } defaultServerOpts :: ServerOptions defaultServerOpts = ServerOptions - { serverHost = "0.0.0.0" - , serverPort = 9092 - , serverSslOptions = Nothing - , serverSaslOptions = Nothing - , serverOnStarted = Nothing + { serverHost = "0.0.0.0" + , serverPort = 9092 + , serverSslOptions = Nothing + , serverSaslOptions = Nothing + , serverOnStarted = Nothing + , ioContextPoolSize = 1 } ------------------------------------------------------------------------------- @@ -211,7 +215,7 @@ runCppServer opts sc_ mkAuthedHandlers = -- 'io_context' inside it will be invalid. And there are potential -- haskell threads that are still using it. For example, the -- 'Cxx.release_lock' in 'processorCallback'. Which will cause a crash. - server <- Cxx.new_kafka_server + server <- Cxx.new_kafka_server (fromIntegral opts.ioContextPoolSize) let start = Cxx.run_kafka_server server hostPtr opts.serverPort cb connCb cfdOnStarted stop a = Cxx.stop_kafka_server server >> Async.wait a diff --git a/hstream-kafka/HStream/Kafka/Network/Cxx.hsc b/hstream-kafka/HStream/Kafka/Network/Cxx.hsc index 2a73802b3..bc34b0c69 100644 --- a/hstream-kafka/HStream/Kafka/Network/Cxx.hsc +++ b/hstream-kafka/HStream/Kafka/Network/Cxx.hsc @@ -151,7 +151,9 @@ withConnContextCallback cb = data CppKafkaServer foreign import ccall unsafe "new_kafka_server" - new_kafka_server :: IO (Ptr CppKafkaServer) + new_kafka_server + :: CSize -- ^ size of io_context_pool + -> IO (Ptr CppKafkaServer) foreign import ccall unsafe "delete_kafka_server" delete_kafka_server :: Ptr CppKafkaServer -> IO () diff --git a/hstream-kafka/cbits/hs_kafka_server.cpp b/hstream-kafka/cbits/hs_kafka_server.cpp index bad64ef79..0cfbb1ad0 100644 --- a/hstream-kafka/cbits/hs_kafka_server.cpp +++ b/hstream-kafka/cbits/hs_kafka_server.cpp @@ -11,9 +11,13 @@ #include #include #include +#include +#include #include "hs_kafka_server.h" +// ---------------------------------------------------------------------------- + static void writeBE(int32_t value, uint8_t bytes[4]) { bytes[0] = (value >> 24) & 0xFF; bytes[1] = (value >> 16) & 0xFF; @@ -25,6 +29,96 @@ static int32_t readBE(uint8_t bytes[4]) { return (bytes[0] << 24) | (bytes[1] << 16) | (bytes[2] << 8) | bytes[3]; } +void hs_event_notify(int& fd) { + if (fd == -1) + return; + + uint64_t u = 1; + ssize_t s = write(fd, &u, sizeof(uint64_t)); + if (s != sizeof(uint64_t)) { + fprintf(stderr, "write to fd %d failed!", fd); + return; + } +} + +// Copy from asio example: +// +// https://think-async.com/Asio/asio-1.28.0/doc/asio/examples/cpp03_examples.html#asio.examples.cpp03_examples.http_server_2 +// +// Using an io_context-per-CPU design. +// +// TODO: How about "using a single io_context and a thread pool calling +// io_context::run()" as this example does: +// +// https://think-async.com/Asio/asio-1.28.0/doc/asio/examples/cpp03_examples.html#asio.examples.cpp03_examples.http_server_3 +class io_context_pool { +public: + /// Construct the io_context pool. + explicit io_context_pool(std::size_t pool_size) : next_io_context_(0) { + if (pool_size == 0) + throw std::runtime_error("io_context_pool size is 0"); + + // Give all the io_contexts work to do so that their run() functions will + // not exit until they are explicitly stopped. + for (std::size_t i = 0; i < pool_size; ++i) { + // FIXME: does it make sense to use concurrency_hint = 1? + // + // https://think-async.com/Asio/asio-1.28.0/doc/asio/overview/core/concurrency_hint.html + io_context_ptr io_context(new asio::io_context(1)); + io_contexts_.push_back(io_context); + work_.push_back(asio::make_work_guard(*io_context)); + } + } + + /// Run all io_context objects in the pool. + void run() { + // Create a pool of threads to run all of the io_contexts. + std::vector threads; + for (std::size_t i = 0; i < io_contexts_.size(); ++i) + threads.emplace_back([this, i] { io_contexts_[i]->run(); }); + + // Wait for all threads in the pool to exit. + for (std::size_t i = 0; i < threads.size(); ++i) + threads[i].join(); + } + + /// Stop all io_context objects in the pool. + void stop() { + // Explicitly stop all io_contexts. + for (std::size_t i = 0; i < io_contexts_.size(); ++i) + io_contexts_[i]->stop(); + } + + /// Get an io_context to use. + asio::io_context& get_io_context() { + // Use a round-robin scheme to choose the next io_context to use. + asio::io_context& io_context = *io_contexts_[next_io_context_]; + ++next_io_context_; + if (next_io_context_ == io_contexts_.size()) + next_io_context_ = 0; + return io_context; + } + +private: + io_context_pool(const io_context_pool&) = delete; + io_context_pool& operator=(const io_context_pool&) = delete; + + typedef std::shared_ptr io_context_ptr; + typedef asio::executor_work_guard + io_context_work; + + /// The pool of io_contexts. + std::vector io_contexts_; + + /// The work that keeps the io_contexts running. + std::list work_; + + /// The next io_context to use for a connection. + std::size_t next_io_context_; +}; + +// ---------------------------------------------------------------------------- + class ServerHandler : public std::enable_shared_from_this { public: ServerHandler(asio::ip::tcp::socket socket, HsCallback& callback, @@ -122,22 +216,12 @@ class ServerHandler : public std::enable_shared_from_this { HsCallback& callback_; }; -void hs_event_notify(int& fd) { - if (fd == -1) - return; - - uint64_t u = 1; - ssize_t s = write(fd, &u, sizeof(uint64_t)); - if (s != sizeof(uint64_t)) { - fprintf(stderr, "write to fd %d failed!", fd); - return; - } -} - asio::awaitable listener(asio::ip::tcp::acceptor acceptor, + io_context_pool& context_pool, HsCallback callback, HsNewStablePtr newConnCtx) { for (;;) { - auto socket = co_await acceptor.async_accept(asio::use_awaitable); + auto socket = co_await acceptor.async_accept(context_pool.get_io_context(), + asio::use_awaitable); auto peer_host = socket.remote_endpoint().address().to_string(); conn_context_t conn_ctx{peer_host.data(), peer_host.size()}; auto sp = newConnCtx(&conn_ctx); @@ -150,27 +234,36 @@ asio::awaitable listener(asio::ip::tcp::acceptor acceptor, extern "C" { struct Server { - asio::io_context io_context{1}; + io_context_pool io_context_pool_; + + explicit Server(std::size_t io_context_pool_size) + : io_context_pool_(io_context_pool_size) {} }; -Server* new_kafka_server() { return new Server(); } +Server* new_kafka_server(std::size_t io_context_pool_size) { + return new Server(io_context_pool_size); +} void run_kafka_server(Server* server, const char* host, uint16_t port, HsCallback callback, HsNewStablePtr newConnCtx, int fd_on_started) { - // Create an address from an IPv4 address string in dotted decimal form, or - // from an IPv6 address in hexadecimal notation. - // - // FIXME: what if the host is a domain? e.g. 'localhost' - auto addr = asio::ip::make_address(host); + auto& context_pool = server->io_context_pool_; + + asio::ip::tcp::acceptor acceptor(context_pool.get_io_context()); + asio::ip::tcp::resolver resolver(acceptor.get_executor()); + asio::ip::tcp::endpoint endpoint = + *resolver.resolve(std::string(host), std::to_string(port)).begin(); free((void*)host); - auto& io_context = server->io_context; - asio::co_spawn(io_context, - listener(asio::ip::tcp::acceptor(io_context, {addr, port}, - true /*reuse_addr*/), - callback, newConnCtx), - asio::detached); + acceptor.open(endpoint.protocol()); + acceptor.set_option(asio::ip::tcp::acceptor::reuse_address(true)); + acceptor.bind(endpoint); + acceptor.listen(); + + asio::co_spawn( + context_pool.get_io_context(), + listener(std::move(acceptor), context_pool, callback, newConnCtx), + asio::detached); // FIXME: Do we need to handle SIGINT and SIGTERM? // @@ -179,10 +272,10 @@ void run_kafka_server(Server* server, const char* host, uint16_t port, hs_event_notify(fd_on_started); - io_context.run(); + context_pool.run(); } -void stop_kafka_server(Server* server) { server->io_context.stop(); } +void stop_kafka_server(Server* server) { server->io_context_pool_.stop(); } void delete_kafka_server(Server* server) { delete server; }