Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Antidote channels #378

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/inter_dc_repl.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
%% on the site that is performing the query
-record(inter_dc_query_state, {
request_type :: inter_dc_message_type(),
zmq_id :: term(),
request_ref :: reference(),
request_id_num_binary :: binary(),
local_pid :: pid()
}).
Expand Down
4 changes: 1 addition & 3 deletions rebar.config
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
{deps, [
{antidote_channels, {git, "https://github.com/AntidoteDB/antidote_channels", {ref,"7905e1ac2e9e3e3f9a4f79b9c54c3dcf0451f154"}}},
%% riak_core_lite framework
{riak_core, {git, "https://github.com/riak-core-lite/riak_core_lite", {tag, "v1.0.0-rc1"}}},

% ranch socket acceptor pool for managing protocol buffer sockets
{ranch, "1.7.1"},

%% efficient inter-dc messaging
{erlzmq, {git, "https://github.com/zeromq/erlzmq2", {ref, "573d583"}}},

%% antidote utilities
antidote_crdt,
antidote_pb_codec,
Expand Down
18 changes: 17 additions & 1 deletion rebar.lock
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
{"1.1.0",
[{<<"accept">>,{pkg,<<"accept">>,<<"0.3.0">>},2},
{<<"amqp_client">>,{pkg,<<"amqp_client">>,<<"3.7.14">>},1},
{<<"antidote_channels">>,
{git,"https://github.com/AntidoteDB/antidote_channels",
{ref,"7905e1ac2e9e3e3f9a4f79b9c54c3dcf0451f154"}},
0},
{<<"antidote_crdt">>,{pkg,<<"antidote_crdt">>,<<"0.1.2">>},0},
{<<"antidote_pb_codec">>,{pkg,<<"antidote_pb_codec">>,<<"0.1.2">>},0},
{<<"antidote_stats">>,
Expand All @@ -12,13 +17,18 @@
{<<"erlzmq">>,
{git,"https://github.com/zeromq/erlzmq2",
{ref,"573d583930c4b1134e504bec83926d188112b401"}},
0},
1},
{<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},4},
{<<"jsx">>,{pkg,<<"jsx">>,<<"2.9.0">>},3},
{<<"lager">>,{pkg,<<"lager">>,<<"3.6.9">>},3},
{<<"poolboy">>,{pkg,<<"poolboy">>,<<"1.5.2">>},1},
{<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.4.1">>},1},
{<<"prometheus_process_collector">>,
{pkg,<<"prometheus_process_collector">>,<<"1.4.3">>},
1},
{<<"rabbit_common">>,{pkg,<<"rabbit_common">>,<<"3.7.14">>},2},
{<<"ranch">>,{pkg,<<"ranch">>,<<"1.7.1">>},0},
{<<"recon">>,{pkg,<<"recon">>,<<"2.4.0">>},3},
{<<"riak_core">>,
{git,"https://github.com/riak-core-lite/riak_core_lite",
{ref,"9194861f5857aa1d0773807a9b0d48a55b3314b8"}},
Expand All @@ -27,14 +37,20 @@
[
{pkg_hash,[
{<<"accept">>, <<"2505B60BCB992CA79BD03AB7B8FEC8A520A47D9730F286DF1A479CC98B03F94B">>},
{<<"amqp_client">>, <<"BF8FD391A3FF02AE6FB8006F07900E22958C50A0A05B8B4856BBAF42F90FDAA2">>},
{<<"antidote_crdt">>, <<"A92A5ED8918D87AD22557825743C6EAC69DD6089D536E1BF5F9AC80992FA97F8">>},
{<<"antidote_pb_codec">>, <<"ECF51F08EE1FEE0D6E82D1B4AE68811A89660A3D65DB90694A84275683AEF106">>},
{<<"antidotec_pb">>, <<"40CD2A0A5F63284E6BB46C84B9D806A7682C79117CFC96154082A4C80759DD01">>},
{<<"elli">>, <<"7842861819869EBBFF7230BC77ECF2DF551AE3EAEF5FDE6B01A7561CACCB811E">>},
{<<"elli_prometheus">>, <<"FF41EA8D88D1EBD1CD7A6D43FCC02B33B47FF20272C097B9D3A3CCCD79980C05">>},
{<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>},
{<<"jsx">>, <<"D2F6E5F069C00266CAD52FB15D87C428579EA4D7D73A33669E12679E203329DD">>},
{<<"lager">>, <<"387BCD836DC0C8AD9C6D90A0E0CE5B29676847950CBC527BCCC194A02028DE8E">>},
{<<"poolboy">>, <<"392B007A1693A64540CEAD79830443ABF5762F5D30CF50BC95CB2C1AAAFA006B">>},
{<<"prometheus">>, <<"1E96073B3ED7788053768FEA779CBC896DDC3BDD9BA60687F2AD50B252AC87D6">>},
{<<"prometheus_process_collector">>, <<"657386E8F142FC817347D95C1F3A05AB08710F7DF9E7F86DB6FACAED107ED929">>},
{<<"rabbit_common">>, <<"607741EFF927EC9FEB5D190C4624816C6DFCD6F49B1F8BAB1F753C9417D74141">>},
{<<"ranch">>, <<"6B1FAB51B49196860B733A49C07604465A47BDB78AA10C1C16A3D199F7F8C881">>},
{<<"recon">>, <<"901FF78B39C754FB4D6FD72DCF0DBD398967BBD2E4D59C08D4D7AA44A73DE91D">>},
{<<"vectorclock">>, <<"6C4A9D44895F51BB99910DBE31FC691BF05FA6B2BF84986F6E3BDE4BD18F6CBA">>}]}
].
6 changes: 3 additions & 3 deletions src/antidote.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@
kernel,
stdlib,
riak_core,
erlzmq,
runtime_tools,
tools,
antidote_stats
antidote_stats,
ranch
]},
{included_applications, [
vectorclock,
antidote_pb_codec,
antidote_crdt,
ranch
antidote_channels
]},

{mod, {antidote_app, []}},
Expand Down
52 changes: 29 additions & 23 deletions src/antidote_pb_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,45 @@
%%====================================================================

start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
supervisor:start_link({local, ?SERVER}, ?MODULE, []).

%%====================================================================
%% Supervisor callbacks
%%====================================================================

%% Child :: {Id,StartFunc,Restart,Shutdown,Type,Modules}
init([]) ->
% we use the rest_for_one restart strategy here,
% because we want the listeners to restart, if the main ranch
% process crashes, but not vice versa.
% We tolerate only 1 error per 5 seconds, because most errors should
% be handled at a lower level.
SupFlags = #{strategy => rest_for_one, intensity => 1, period => 5},
RanchSupSpec = #{
id => ranch_sup,
start => {ranch_sup, start_link, []},
restart => permanent,
shutdown => 1000,
type => supervisor,
modules => [ranch_sup]},
{ok, {SupFlags, [
RanchSupSpec,
pb_listener()
]}}.
% we use the rest_for_one restart strategy here,
% because we want the listeners to restart, if the main ranch
% process crashes, but not vice versa.
% We tolerate only 1 error per 5 seconds, because most errors should
% be handled at a lower level.
SupFlags = #{strategy => rest_for_one, intensity => 1, period => 5},
case whereis(ranch_sup) of
undefined ->
RanchSupSpec = #{
id => ranch_sup,
start => {ranch_sup, start_link, []},
restart => permanent,
shutdown => 1000,
type => supervisor,
modules => [ranch_sup]},
{ok, {SupFlags, [RanchSupSpec, pb_listener()]}};
_ ->
{ok, {SupFlags, [pb_listener()]}}
end.


%%====================================================================
%% Internal functions
%%====================================================================

pb_listener() ->
NumberOfAcceptors = application:get_env(ranch, pb_pool_size, 100),
Port = application:get_env(ranch, pb_port, 8087),
MaxConnections = application:get_env(ranch, pb_max_connections, 1024),
RanchOptions = [{port, Port}, {max_connections, MaxConnections}],
ranch:child_spec(antidote_pb_process, NumberOfAcceptors, ranch_tcp, RanchOptions, antidote_pb_protocol, []).
NumberOfAcceptors = application:get_env(ranch, pb_pool_size, 100),
Port = application:get_env(ranch, pb_port, 8087),
MaxConnections = application:get_env(ranch, pb_max_connections, 1024),
RanchOptions = [{port, Port}, {max_connections, MaxConnections}],
ranch:child_spec(antidote_pb_process, NumberOfAcceptors,
ranch_tcp, RanchOptions,
antidote_pb_protocol, []
).
4 changes: 2 additions & 2 deletions src/antidote_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ init(_Args) ->

BCounterManager = ?CHILD(bcounter_mgr, worker, []),

ZMQContextManager = ?CHILD(zmq_context, worker, []),
%ZMQContextManager = ?CHILD(zmq_context, worker, []),
InterDcPub = ?CHILD(inter_dc_pub, worker, []),
InterDcSub = ?CHILD(inter_dc_sub, worker, []),
StableMetaData = ?CHILD(stable_meta_data_server, worker, []),
Expand Down Expand Up @@ -119,7 +119,7 @@ init(_Args) ->
ClockSIMaster,
ClockSIiTxCoordSup,
MaterializerMaster,
ZMQContextManager,
%ZMQContextManager,
InterDcPub,
InterDcSub,
InterDcSubVnode,
Expand Down
93 changes: 55 additions & 38 deletions src/inter_dc_pub.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,43 +35,46 @@

-include("antidote.hrl").
-include("inter_dc_repl.hrl").

-include_lib("antidote_channels/include/antidote_channel.hrl").
-include_lib("kernel/include/logger.hrl").

%% API
-export([
broadcast/1,
get_address/0,
get_address_list/0]).
broadcast/1,
get_address/0,
get_address_list/0]).

%% Server methods
-export([
init/1,
start_link/0,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
init/1,
start_link/0,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).

%% State
-record(state, {socket}). %% socket :: erlzmq_socket()
-record(state, {channel :: channel()}).

%%%% API --------------------------------------------------------------------+

-spec get_address() -> socket_address().
get_address() ->
%% first try resolving our hostname according to the node name
[_, Hostname] = string:tokens(atom_to_list(erlang:node()), "@"),
Ip = case inet:getaddr(Hostname, inet) of
{ok, HostIp} -> HostIp;
{error, _} ->
%% cannot resolve hostname locally, fall back to interface ip
%% TODO check if we do not return a link-local address
{ok, List} = inet:getif(),
{IIp, _, _} = hd(List),
IIp
end,
Port = application:get_env(antidote, pubsub_port, ?DEFAULT_PUBSUB_PORT),
{Ip, Port}.
%% first try resolving our hostname according to the node name
[_, Hostname] = string:tokens(atom_to_list(erlang:node()), "@"),
Ip = case inet:getaddr(Hostname, inet) of
{ok, HostIp} -> HostIp;
{error, _} ->
%% cannot resolve hostname locally, fall back to interface ip
%% TODO check if we do not return a link-local address
{ok, List} = inet:getif(),
{IIp, _, _} = hd(List),
IIp
end,
Port = application:get_env(antidote, pubsub_port, ?DEFAULT_PUBSUB_PORT),
{Ip, Port}.

-spec get_address_list() -> [socket_address()].
get_address_list() ->
Expand All @@ -80,32 +83,46 @@ get_address_list() ->
%% get host name from node name
[_, Hostname] = string:tokens(atom_to_list(erlang:node()), "@"),
IpList = case inet:getaddr(Hostname, inet) of
{ok, HostIp} -> [HostIp|List1];
{error, _} -> List1
end,
{ok, HostIp} -> [HostIp | List1];
{error, _} -> List1
end,
Port = application:get_env(antidote, pubsub_port, ?DEFAULT_PUBSUB_PORT),
[{Ip1, Port} || Ip1 <- IpList, Ip1 /= {127, 0, 0, 1}].

-spec broadcast(interdc_txn()) -> ok.
broadcast(Txn) ->
case catch gen_server:call(?MODULE, {publish, inter_dc_txn:to_bin(Txn)}) of
{'EXIT', _Reason} -> ?LOG_WARNING("Failed to broadcast a transaction."); %% this can happen if a node is shutting down.
Normal -> Normal
end.
broadcast(#interdc_txn{partition = P} = Txn) ->
case catch gen_server:call(?MODULE, {publish, inter_dc_txn:partition_to_bin(P), Txn}) of
{'EXIT', _Reason} ->
?LOG_WARNING("Failed to broadcast a transaction."); %% this can happen if a node is shutting down.
Normal -> Normal
end.

%%%% Server methods ---------------------------------------------------------+

start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

init([]) ->
{_, Port} = get_address(),
Socket = zmq_utils:create_bind_socket(pub, false, Port),
?LOG_INFO("Publisher started on port ~p", [Port]),
{ok, #state{socket = Socket}}.
{_, Port} = get_address(),

Config = #{
module => channel_zeromq,
pattern => pub_sub,
namespace => <<>>,
network_params => #{
host => {0, 0, 0, 0},
port => Port
}
},

{ok, Channel} = antidote_channel:start_link(Config),
{ok, #state{channel = Channel}}.

handle_call({publish, Partition, Message}, _From, State) ->
ok = antidote_channel:send(State#state.channel, #pub_sub_msg{topic = Partition, payload = Message}),
{reply, ok, State}.

handle_call({publish, Message}, _From, State) -> {reply, erlzmq:send(State#state.socket, Message), State}.

terminate(_Reason, State) -> erlzmq:close(State#state.socket).
terminate(_Reason, State) -> antidote_channel:stop(State#state.channel).
handle_cast(_Request, State) -> {noreply, State}.
handle_info(_Info, State) -> {noreply, State}.
code_change(_OldVsn, State, _Extra) -> {ok, State}.
Loading