Skip to content

Commit

Permalink
Merge pull request #330 from akumuli/fix-328
Browse files Browse the repository at this point in the history
Bind server to specific interface
  • Loading branch information
Lazin authored Dec 11, 2019
2 parents d4fbc66 + d7c6627 commit fa63655
Show file tree
Hide file tree
Showing 13 changed files with 232 additions and 110 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
Release notes
=============

Version 0.8.73
--------------

BUG FIX

* Fix #328 group-by query crash

IMPROVEMENT

* Bind HTTP/TCP/UDP server to specific iface

Version 0.8.72
--------------

Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 2.8)

set(APP_VERSION_MAJOR "0")
set(APP_VERSION_MINOR "8")
set(APP_VERSION_PATCH "72")
set(APP_VERSION_PATCH "73")

set(APP_VERSION "${APP_VERSION_MAJOR}.${APP_VERSION_MINOR}.${APP_VERSION_PATCH}")
add_definitions(-DAKU_VERSION="${APP_VERSION}")
Expand Down
14 changes: 8 additions & 6 deletions akumulid/httpserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,27 +152,29 @@ static int accept_connection(void *cls,
}
}

HttpServer::HttpServer(unsigned short port, std::shared_ptr<ReadOperationBuilder> qproc, AccessControlList const& acl)
HttpServer::HttpServer(boost::asio::ip::tcp::endpoint const& endpoint, std::shared_ptr<ReadOperationBuilder> qproc, AccessControlList const& acl)
: acl_(acl)
, proc_(qproc)
, port_(port)
, endpoint_(endpoint)
, daemon_(nullptr) // `start` should be called to initialize daemon_ correctly
{
}

HttpServer::HttpServer(unsigned short port, std::shared_ptr<ReadOperationBuilder> qproc)
: HttpServer(port, qproc, AccessControlList())
HttpServer::HttpServer(boost::asio::ip::tcp::endpoint const& endpoint, std::shared_ptr<ReadOperationBuilder> qproc)
: HttpServer(endpoint, qproc, AccessControlList())
{
}

void HttpServer::start(SignalHandler* sig, int id) {
logger.info() << "Start MHD daemon";
daemon_ = MHD_start_daemon(MHD_USE_THREAD_PER_CONNECTION,
port_,
endpoint_.port(),
NULL,
NULL,
&MHD::accept_connection,
proc_.get(),
MHD_OPTION_SOCK_ADDR,
endpoint_.data(),
MHD_OPTION_END);
if (daemon_ == nullptr) {
BOOST_THROW_EXCEPTION(std::runtime_error("can't start daemon"));
Expand Down Expand Up @@ -202,7 +204,7 @@ struct HttpServerBuilder {
s_logger_.error() << "Can't initialize HTTP server, more than one protocol specified";
BOOST_THROW_EXCEPTION(std::runtime_error("invalid http-server settings"));
}
return std::make_shared<HttpServer>(settings.protocols.front().port, qproc);
return std::make_shared<HttpServer>(settings.protocols.front().endpoint, qproc);
}
};

Expand Down
7 changes: 4 additions & 3 deletions akumulid/httpserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <tuple>

#include <microhttpd.h>
#include <boost/asio.hpp>

#include "akumuli.h"
#include "logger.h"
Expand All @@ -33,11 +34,11 @@ struct AccessControlList {}; // TODO: implement ACL
struct HttpServer : std::enable_shared_from_this<HttpServer>, Server {
AccessControlList acl_;
std::shared_ptr<ReadOperationBuilder> proc_;
unsigned short port_;
boost::asio::ip::tcp::endpoint endpoint_;
MHD_Daemon* daemon_;

HttpServer(unsigned short port, std::shared_ptr<ReadOperationBuilder> qproc);
HttpServer(unsigned short port, std::shared_ptr<ReadOperationBuilder> qproc,
HttpServer(const boost::asio::ip::tcp::endpoint &endpoint, std::shared_ptr<ReadOperationBuilder> qproc);
HttpServer(const boost::asio::ip::tcp::endpoint &endpoint, std::shared_ptr<ReadOperationBuilder> qproc,
AccessControlList const& acl);

virtual void start(SignalHandler* handler, int id);
Expand Down
61 changes: 52 additions & 9 deletions akumulid/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,25 +264,68 @@ struct ConfigFile {
static ServerSettings get_http_server(PTree conf) {
ServerSettings settings;
settings.name = "HTTP";
settings.protocols.push_back({ "HTTP", conf.get<int>("HTTP.port")});
settings.nworkers = -1;
auto ip = conf.get_optional<std::string>("HTTP.bind_addr");
if (ip) {
auto addr = boost::asio::ip::address_v4::from_string(*ip);
boost::asio::ip::tcp::endpoint endpoint(addr, conf.get<unsigned short>("HTTP.port"));
settings.protocols.push_back({ "HTTP", endpoint});
settings.nworkers = -1;
}
else {
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(),
conf.get<unsigned short>("HTTP.port"));
settings.protocols.push_back({ "HTTP", endpoint });
settings.nworkers = -1;
}
return settings;
}

static ServerSettings get_udp_server(PTree conf) {
ServerSettings settings;
settings.name = "UDP";
settings.protocols.push_back({ "UDP", conf.get<int>("UDP.port")});
settings.nworkers = conf.get<int>("UDP.pool_size");
auto ip = conf.get_optional<std::string>("UDP.bind_addr");
if (ip) {
auto addr = boost::asio::ip::address_v4::from_string(*ip);
boost::asio::ip::tcp::endpoint endpoint(addr, conf.get<unsigned short>("UDP.port"));
settings.protocols.push_back({ "UDP", endpoint});
settings.nworkers = conf.get<int>("UDP.pool_size");
}
else {
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(),
conf.get<unsigned short>("UDP.port"));
settings.protocols.push_back({ "UDP", endpoint });
settings.nworkers = conf.get<int>("UDP.pool_size");
}
return settings;
}

static ServerSettings get_tcp_server(PTree conf) {
ServerSettings settings;
settings.name = "TCP";
settings.protocols.push_back({ "RESP", conf.get<int>("TCP.port")});
auto ip = conf.get_optional<std::string>("TCP.bind_addr");
if (ip) {
auto addr = boost::asio::ip::address_v4::from_string(*ip);
boost::asio::ip::tcp::endpoint endpoint(addr, conf.get<unsigned short>("TCP.port"));
settings.protocols.push_back({ "RESP", endpoint });
}
else {
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(),
conf.get<unsigned short>("TCP.port"));
settings.protocols.push_back({ "RESP", endpoint });
}

if (conf.count("OpenTSDB")) {
settings.protocols.push_back({ "OpenTSDB", conf.get<int>("OpenTSDB.port")});
auto oip = conf.get_optional<std::string>("OpenTSDB.bind_addr");
if (oip) {
auto addr = boost::asio::ip::address_v4::from_string(*oip);
boost::asio::ip::tcp::endpoint endpoint(addr, conf.get<unsigned short>("OpenTSDB.port"));
settings.protocols.push_back({ "OpenTSDB", endpoint });
}
else {
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(),
conf.get<unsigned short>("OpenTSDB.port"));
settings.protocols.push_back({ "OpenTSDB", endpoint });
}
}
settings.nworkers = conf.get<int>("TCP.pool_size");
return settings;
Expand Down Expand Up @@ -518,13 +561,13 @@ void cmd_run_server(boost::optional<std::string> cmd_config_path) {
logger.info() << "Starting " << settings.name << " index " << srvid;
if (settings.protocols.size() == 1) {
std::cout << cli_format("**OK** ") << settings.name
<< " server started, port: " << settings.protocols[0].port << std::endl;
<< " server started, endpoint: " << settings.protocols[0].endpoint << std::endl;
} else {
std::cout << cli_format("**OK** ") << settings.name
<< " server started";
for (const auto& protocol: settings.protocols) {
std::cout << ", " << protocol.name << " port: " << protocol.port;
logger.info() << "Protocol: " << protocol.name << " port: " << protocol.port;
std::cout << ", " << protocol.name << " endpoint: " << protocol.endpoint;
logger.info() << "Protocol: " << protocol.name << " endpoint: " << protocol.endpoint;
}
std::cout << std::endl;
}
Expand Down
6 changes: 4 additions & 2 deletions akumulid/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include "storage_api.h"
#include "signal_handler.h"

#include <boost/asio.hpp>

#include <map>
#include <string>
#include <tuple>
Expand All @@ -11,8 +13,8 @@
namespace Akumuli {

struct ProtocolSettings {
std::string name;
int port;
std::string name;
boost::asio::ip::tcp::endpoint endpoint;
};

struct ServerSettings {
Expand Down
31 changes: 16 additions & 15 deletions akumulid/tcp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,12 @@ std::unique_ptr<ProtocolSessionBuilder> ProtocolSessionBuilder::create_opentsdb_
// //

TcpAcceptor::TcpAcceptor(// Server parameters
std::vector<IOServiceT *> io, int port,
std::vector<IOServiceT *> io, EndpointT endpoint,
// Storage & pipeline
std::shared_ptr<DbConnection> connection , bool parallel)
std::shared_ptr<DbConnection> connection ,
bool parallel)
//: parallel_(parallel)
: acceptor_(own_io_, EndpointT(boost::asio::ip::tcp::v4(), static_cast<u16>(port)))
: acceptor_(own_io_, endpoint)
, protocol_(ProtocolSessionBuilder::create_resp_builder(true))
, sessions_io_(io)
, connection_(connection)
Expand All @@ -276,7 +277,7 @@ TcpAcceptor::TcpAcceptor(// Server parameters
, iothread_started_(false)
{
logger_.info() << "Server created!";
logger_.info() << "Port: " << port;
logger_.info() << "Endpoint: " << endpoint;

// Blocking I/O services
for (auto io: sessions_io_) {
Expand All @@ -286,12 +287,12 @@ TcpAcceptor::TcpAcceptor(// Server parameters

TcpAcceptor::TcpAcceptor(
std::vector<IOServiceT*> io,
int port,
EndpointT endpoint,
std::unique_ptr<ProtocolSessionBuilder> protocol,
std::shared_ptr<DbConnection> connection,
bool parallel)
//: parallel_(parallel)
: acceptor_(own_io_, EndpointT(boost::asio::ip::tcp::v4(), static_cast<u16>(port)))
: acceptor_(own_io_, endpoint)
, protocol_(std::move(protocol))
, sessions_io_(io)
, connection_(connection)
Expand All @@ -302,7 +303,7 @@ TcpAcceptor::TcpAcceptor(
, iothread_started_(false)
{
logger_.info() << "Server created!";
logger_.info() << "Port: " << port;
logger_.info() << "Endpoint: " << endpoint;

// Blocking I/O services
for (auto io: sessions_io_) {
Expand Down Expand Up @@ -415,7 +416,7 @@ void TcpAcceptor::handle_accept(std::shared_ptr<ProtocolSession> session, boost:
// Tcp Server //
// //

TcpServer::TcpServer(std::shared_ptr<DbConnection> connection, int concurrency, int port, TcpServer::Mode mode)
TcpServer::TcpServer(std::shared_ptr<DbConnection> connection, int concurrency, EndpointT ep, TcpServer::Mode mode)
: connection_(connection)
, barrier(static_cast<u32>(concurrency) + 1)
, stopped{0}
Expand All @@ -436,7 +437,7 @@ TcpServer::TcpServer(std::shared_ptr<DbConnection> connection, int concurrency,
bool parallel = mode == Mode::SHARED_EVENT_LOOP;
auto con = connection_.lock();
if (con) {
auto serv = std::make_shared<TcpAcceptor>(iovec, port, con, parallel);
auto serv = std::make_shared<TcpAcceptor>(iovec, ep, con, parallel);
serv->start();
acceptors_.push_back(serv);
} else {
Expand All @@ -448,7 +449,7 @@ TcpServer::TcpServer(std::shared_ptr<DbConnection> connection, int concurrency,

TcpServer::TcpServer(std::shared_ptr<DbConnection> connection,
int concurrency,
std::map<int, std::unique_ptr<ProtocolSessionBuilder> > protocol_map,
std::map<EndpointT, std::unique_ptr<ProtocolSessionBuilder> > protocol_map,
TcpServer::Mode mode)
: connection_(connection)
, barrier(static_cast<u32>(concurrency) + 1)
Expand All @@ -470,11 +471,11 @@ TcpServer::TcpServer(std::shared_ptr<DbConnection> connection,
bool parallel = mode == Mode::SHARED_EVENT_LOOP;
auto con = connection_.lock();
for (auto& kv: protocol_map) {
int port = kv.first;
EndpointT endpoint = kv.first;
auto protocol = std::move(kv.second);
logger_.info() << "Create acceptor for " << protocol->name() << ", port: " << port;
logger_.info() << "Create acceptor for " << protocol->name() << ", endpoint: " << endpoint;
if (con) {
auto serv = std::make_shared<TcpAcceptor>(iovec, port, std::move(protocol), con, parallel);
auto serv = std::make_shared<TcpAcceptor>(iovec, endpoint, std::move(protocol), con, parallel);
serv->start();
acceptors_.push_back(serv);
} else {
Expand Down Expand Up @@ -566,7 +567,7 @@ struct TcpServerBuilder {
if (nworkers >= AKU_MAX_THREADS) {
nworkers = AKU_MAX_THREADS - 4;
}
std::map<int, std::unique_ptr<ProtocolSessionBuilder>> protocol_map;
std::map<EndpointT, std::unique_ptr<ProtocolSessionBuilder>> protocol_map;
for (const auto& protocol: settings.protocols) {
std::unique_ptr<ProtocolSessionBuilder> inst;
if (protocol.name == "RESP") {
Expand All @@ -576,7 +577,7 @@ struct TcpServerBuilder {
} else {
s_logger_.error() << "Unknown protocol " << protocol.name;
}
protocol_map[protocol.port] = std::move(inst);
protocol_map[protocol.endpoint] = std::move(inst);
}
return std::make_shared<TcpServer>(con, nworkers, std::move(protocol_map));
}
Expand Down
25 changes: 12 additions & 13 deletions akumulid/tcp_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ typedef boost::asio::io_service IOServiceT;
typedef boost::asio::ip::tcp::acceptor AcceptorT;
typedef boost::asio::ip::tcp::socket SocketT;
typedef boost::asio::ip::tcp::endpoint EndpointT;
typedef boost::asio::io_service::strand StrandT;
typedef boost::asio::io_service::strand StrandT;
typedef boost::asio::io_service::work WorkT;
typedef std::function<void(aku_Status, u64)> ErrorCallback;

Expand Down Expand Up @@ -129,10 +129,10 @@ class TcpAcceptor : public std::enable_shared_from_this<TcpAcceptor> {
* @param port port to listen for new connections
* @param connection to the database
*/
TcpAcceptor(
std::vector<IOServiceT*> io, int port,
std::shared_ptr<DbConnection> connection,
bool parallel=true);
TcpAcceptor(std::vector<IOServiceT*> io,
EndpointT endpoint,
std::shared_ptr<DbConnection> connection,
bool parallel=true);

/**
* Create multiprotocol c-tor
Expand All @@ -141,12 +141,11 @@ class TcpAcceptor : public std::enable_shared_from_this<TcpAcceptor> {
* @param protocol is a protocol builder
* @param connection to the database
*/
TcpAcceptor(
std::vector<IOServiceT*> io,
int port,
std::unique_ptr<ProtocolSessionBuilder> protocol,
std::shared_ptr<DbConnection> connection,
bool parallel=true);
TcpAcceptor(std::vector<IOServiceT*> io,
EndpointT endpoint,
std::unique_ptr<ProtocolSessionBuilder> protocol,
std::shared_ptr<DbConnection> connection,
bool parallel=true);

~TcpAcceptor();

Expand Down Expand Up @@ -195,11 +194,11 @@ struct TcpServer : std::enable_shared_from_this<TcpServer>, Server {
* @param mode is a server mode (event loop per thread or one shared event loop)
* @note I've found that on Linux Mode::EVENT_LOOP_PER_THREAD gives more consistent results (the other option should be used for windows)
*/
TcpServer(std::shared_ptr<DbConnection> connection, int concurrency, int port, Mode mode=Mode::EVENT_LOOP_PER_THREAD);
TcpServer(std::shared_ptr<DbConnection> connection, int concurrency, EndpointT ep, Mode mode=Mode::EVENT_LOOP_PER_THREAD);

TcpServer(std::shared_ptr<DbConnection> connection,
int concurrency,
std::map<int, std::unique_ptr<ProtocolSessionBuilder>> protocol_map,
std::map<EndpointT, std::unique_ptr<ProtocolSessionBuilder> > protocol_map,
Mode mode=Mode::EVENT_LOOP_PER_THREAD);

~TcpServer();
Expand Down
Loading

0 comments on commit fa63655

Please sign in to comment.