diff --git a/src/cets.erl b/src/cets.erl index 54340953..cd6260f2 100644 --- a/src/cets.erl +++ b/src/cets.erl @@ -130,7 +130,8 @@ is_leader := boolean(), opts := start_opts(), backlog := [backlog_entry()], - pause_monitors := [pause_monitor()] + pause_monitors := [pause_monitor()], + node_down_history := [node()] }. -type long_msg() :: @@ -154,7 +155,8 @@ memory := non_neg_integer(), ack_pid := ack_pid(), join_ref := join_ref(), - opts := start_opts() + opts := start_opts(), + node_down_history := [node()] }. -type handle_down_fun() :: fun((#{remote_pid := server_pid(), table := table_name()}) -> ok). @@ -417,7 +419,8 @@ init({Tab, Opts}) -> is_leader => true, opts => Opts, backlog => [], - pause_monitors => [] + pause_monitors => [], + node_down_history => [] }}. -spec handle_call(long_msg() | {op, op()}, from(), state()) -> @@ -524,7 +527,7 @@ handle_down2(RemotePid, State = #{other_servers := Servers, ack_pid := AckPid}) cets_ack:send_remote_down(AckPid, RemotePid), call_user_handle_down(RemotePid, State), Servers2 = lists:delete(RemotePid, Servers), - set_other_servers(Servers2, State); + update_node_down_history(RemotePid, set_other_servers(Servers2, State)); false -> %% This should not happen ?LOG_ERROR(#{ @@ -535,6 +538,9 @@ handle_down2(RemotePid, State = #{other_servers := Servers, ack_pid := AckPid}) State end. +update_node_down_history(RemotePid, State = #{node_down_history := History}) -> + State#{node_down_history := [node(RemotePid) | History]}. + %% Merge two lists of pids, create the missing monitors. -spec add_servers(Servers, Servers) -> Servers when Servers :: servers(). add_servers(Pids, Servers) -> @@ -742,6 +748,7 @@ handle_get_info( other_servers := Servers, ack_pid := AckPid, join_ref := JoinRef, + node_down_history := DownHistory, opts := Opts } ) -> @@ -752,6 +759,7 @@ handle_get_info( memory => ets:info(Tab, memory), ack_pid => AckPid, join_ref => JoinRef, + node_down_history => DownHistory, opts => Opts }. diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index f39ab5cf..09544896 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -90,11 +90,18 @@ join_status := not_running | running, should_retry_join := boolean(), timer_ref := reference() | undefined, - pending_wait_for_ready := [gen_server:from()] + pending_wait_for_ready := [gen_server:from()], + nodeup_timestamps := #{node() => milliseconds()}, + nodedown_timestamps := #{node() => milliseconds()}, + node_start_timestamps := #{node() => milliseconds()}, + start_time := milliseconds() }. +-type milliseconds() :: integer(). %% Backend could define its own options --type opts() :: #{name := atom(), _ := _}. +-type opts() :: #{ + name := atom(), _ := _ +}. -type start_result() :: {ok, pid()} | {error, term()}. -type server() :: pid() | atom(). -type system_info() :: map(). @@ -151,6 +158,7 @@ wait_for_ready(Server, Timeout) -> -spec init(term()) -> {ok, state()}. init(Opts) -> + StartTime = erlang:system_time(millisecond), %% Sends nodeup / nodedown ok = net_kernel:monitor_nodes(true), Mod = maps:get(backend_module, Opts, cets_discovery_file), @@ -159,7 +167,7 @@ init(Opts) -> BackendState = Mod:init(Opts), %% Changes phase from initial to regular (affects the check interval) erlang:send_after(timer:minutes(5), self(), enter_regular_phase), - {ok, #{ + State = #{ phase => initial, results => [], nodes => [], @@ -174,8 +182,16 @@ init(Opts) -> join_status => not_running, should_retry_join => false, timer_ref => undefined, - pending_wait_for_ready => [] - }}. + pending_wait_for_ready => [], + nodeup_timestamps => #{}, + node_start_timestamps => #{}, + nodedown_timestamps => #{}, + start_time => StartTime + }, + %% Set initial timestamps because we would not receive nodeup events for + %% already connected nodes + State2 = lists:foldl(fun remember_nodeup_timestamp/2, State, nodes()), + {ok, State2}. -spec handle_call(term(), from(), state()) -> {reply, term(), state()} | {noreply, state()}. handle_call(get_tables, _From, State = #{tables := Tables}) -> @@ -216,12 +232,35 @@ handle_info(check, State) -> handle_info({handle_check_result, Result, BackendState}, State) -> {noreply, handle_get_nodes_result(Result, BackendState, State)}; handle_info({nodeup, Node}, State) -> - State2 = remove_node_from_unavailable_list(Node, State), - {noreply, try_joining(State2)}; -handle_info({nodedown, _Node}, State) -> + {NodeDownTime, State2} = handle_nodeup(Node, State), + ?LOG_WARNING( + set_defined(downtime_millisecond_duration, NodeDownTime, #{ + what => nodeup, + remote_node => Node, + alive_nodes => length(nodes()) + 1, + %% We report that time so we could work on minimizing that time. + %% It says how long it took to discover nodes after startup. + time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State) + }) + ), + State3 = remove_node_from_unavailable_list(Node, State2), + {noreply, try_joining(State3)}; +handle_info({nodedown, Node}, State) -> + {NodeUpTime, State2} = handle_nodedown(Node, State), + ?LOG_WARNING( + set_defined(connected_millisecond_duration, NodeUpTime, #{ + what => nodedown, + remote_node => Node, + alive_nodes => length(nodes()) + 1, + time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State) + }) + ), + send_start_time_to(Node, State), %% Do another check to update unavailable_nodes list self() ! check, - {noreply, State}; + {noreply, State2}; +handle_info({start_time, Node, StartTime}, State) -> + {noreply, handle_receive_start_time(Node, StartTime, State)}; handle_info({joining_finished, Results}, State) -> {noreply, handle_joining_finished(Results, State)}; handle_info({ping_result, Node, Result}, State) -> @@ -315,7 +354,7 @@ ping_not_connected_nodes(Nodes) -> Self = self(), NotConNodes = Nodes -- [node() | nodes()], [ - spawn(fun() -> Self ! {ping_result, Node, net_adm:ping(Node)} end) + spawn(fun() -> Self ! {ping_result, Node, cets_ping:ping(Node)} end) || Node <- lists:sort(NotConNodes) ], ok. @@ -454,3 +493,76 @@ has_join_result_for(Node, Table, #{results := Results}) -> -spec handle_system_info(state()) -> system_info(). handle_system_info(State) -> State#{verify_ready => verify_ready(State)}. + +handle_nodedown(Node, State) -> + State2 = remember_nodedown_timestamp(Node, State), + remove_nodeup_timestamp(Node, State2). + +handle_nodeup(Node, State) -> + State2 = remember_nodeup_timestamp(Node, State), + {get_downtime(Node, State2), State2}. + +remember_nodeup_timestamp(Node, State = #{nodeup_timestamps := Map}) -> + Time = erlang:system_time(millisecond), + Map2 = Map#{Node => Time}, + State#{nodeup_timestamps := Map2}. + +remember_nodedown_timestamp(Node, State = #{nodedown_timestamps := Map}) -> + Time = erlang:system_time(millisecond), + Map2 = Map#{Node => Time}, + State#{nodedown_timestamps := Map2}. + +remove_nodeup_timestamp(Node, State = #{nodeup_timestamps := Map}) -> + StartTime = maps:get(Node, Map, undefined), + NodeUpTime = calculate_uptime(StartTime), + Map2 = maps:remove(Node, State), + {NodeUpTime, State#{nodeup_timestamps := Map2}}. + +calculate_uptime(undefined) -> + undefined; +calculate_uptime(StartTime) -> + time_since(StartTime). + +get_downtime(Node, #{nodedown_timestamps := Map}) -> + case maps:get(Node, Map, undefined) of + undefined -> + undefined; + WentDown -> + time_since(WentDown) + end. + +set_defined(_Key, undefined, Map) -> + Map; +set_defined(Key, Value, Map) -> + Map#{Key => Value}. + +time_since_startup_in_milliseconds(#{start_time := StartTime}) -> + time_since(StartTime). + +time_since(StartTime) -> + erlang:system_time(millisecond) - StartTime. + +send_start_time_to(Node, #{start_time := StartTime}) -> + case erlang:process_info(self(), registered_name) of + {registered_name, Name} -> + erlang:send({Name, Node}, {start_time, node(), StartTime}); + _ -> + ok + end. + +handle_receive_start_time(Node, StartTime, State = #{node_start_timestamps := Map}) -> + case maps:get(Node, Map, undefined) of + undefined -> + ok; + StartTime -> + ?LOG_WARNING(#{ + what => node_reconnects, + remote_node => Node, + start_time => StartTime, + text => <<"Netsplit recovery. The remote node has been connected to us before.">> + }); + _ -> + %% Restarted node reconnected, this is fine during the rolling updates + ok + end, + State#{node_start_timestamps := maps:put(Node, StartTime, Map)}. diff --git a/src/cets_join.erl b/src/cets_join.erl index 28a01394..b63974c5 100644 --- a/src/cets_join.erl +++ b/src/cets_join.erl @@ -84,10 +84,12 @@ join_loop(LockKey, Info, LocalPid, RemotePid, Start, JoinOpts) -> %% Just lock all nodes, no magic here :) Nodes = [node() | nodes()], Retries = 1, + %% global could abort the transaction when one of the nodes goes down. + %% It could usually abort it during startup or update. case global:trans(LockRequest, F, Nodes, Retries) of aborted -> checkpoint(before_retry, JoinOpts), - ?LOG_ERROR(Info#{what => join_retry, reason => lock_aborted}), + ?LOG_INFO(Info#{what => join_retry, reason => lock_aborted}), join_loop(LockKey, Info, LocalPid, RemotePid, Start, JoinOpts); Result -> Result @@ -195,9 +197,9 @@ get_pids(Pid) -> check_pids(Info, LocPids, RemPids, JoinOpts) -> check_do_not_overlap(Info, LocPids, RemPids), checkpoint(before_check_fully_connected, JoinOpts), + check_could_reach_each_other(Info, LocPids, RemPids), check_fully_connected(Info, LocPids), - check_fully_connected(Info, RemPids), - check_could_reach_each_other(Info, LocPids, RemPids). + check_fully_connected(Info, RemPids). -spec check_could_reach_each_other(cets_long:log_info(), cets:servers(), cets:servers()) -> ok. check_could_reach_each_other(Info, LocPids, RemPids) -> @@ -207,16 +209,7 @@ check_could_reach_each_other(Info, LocPids, RemPids) -> {min(LocNode, RemNode), max(LocNode, RemNode)} || LocNode <- LocNodes, RemNode <- RemNodes, LocNode =/= RemNode ]), - Results = - [ - {Node1, Node2, - cets_long:run_tracked( - #{task => ping_node, node1 => Node1, node2 => Node2}, fun() -> - rpc:call(Node1, net_adm, ping, [Node2], 10000) - end - )} - || {Node1, Node2} <- Pairs - ], + Results = cets_ping:ping_pairs(Pairs), NotConnected = [X || {_Node1, _Node2, Res} = X <- Results, Res =/= pong], case NotConnected of [] -> diff --git a/src/cets_long.erl b/src/cets_long.erl index dce536fb..bf2ccc63 100644 --- a/src/cets_long.erl +++ b/src/cets_long.erl @@ -91,6 +91,9 @@ run_monitor(Info, Ref, Parent, Start) -> monitor_loop(Mon, Info, Parent, Start, Interval) -> receive + {'DOWN', _MonRef, process, _Pid, stop} -> + %% Special case, the long task is stopped using exit(Pid, stop) + ok; {'DOWN', MonRef, process, _Pid, Reason} when Mon =:= MonRef -> ?LOG_ERROR(Info#{ what => task_failed, diff --git a/src/cets_ping.erl b/src/cets_ping.erl new file mode 100644 index 00000000..300e4416 --- /dev/null +++ b/src/cets_ping.erl @@ -0,0 +1,71 @@ +-module(cets_ping). +-export([ping/1, ping_pairs/1]). +ping(Node) when is_atom(Node) -> + %% It is important to understand, that initial setup for dist connections + %% is done by the single net_kernel process. + %% It calls net_kernel:setup, which calls inet_tcp_dist, which calls + %% erl_epmd:address_please/3, which does a DNS request. + %% If DNS is slow - net_kernel process would become busy. + %% But if we have a lot of offline nodes in the CETS discovery table, + %% we would try to call net_kernel for each node (even if we probably would receive + %% {error, nxdomain} from erl_epmd:address_please/3). + %% So, we first do nslookup here and only after that we try to connect. + case lists:member(Node, nodes()) of + true -> + pong; + false -> + case dist_util:split_node(Node) of + {node, Name, Host} -> + Epmd = net_kernel:epmd_module(), + V4 = Epmd:address_please(Name, Host, inet), + V6 = Epmd:address_please(Name, Host, inet6), + case {V4, V6} of + {{error, _}, {error, _}} -> + pang; + _ -> + connect_ping(Node) + end; + _ -> + pang + end + end. + +connect_ping(Node) -> + %% We could use net_adm:ping/1 but it does: + %% - disconnect node on pang - we don't want that + %% (because it could disconnect already connected node because of race conditions) + %% - it calls net_kernel's gen_server of the remote server, + %% but it could be busy doing something, + %% which means slower response time. + case net_kernel:connect_node(Node) of + true -> + pong; + _ -> + pang + end. + +-spec ping_pairs([{node(), node()}]) -> [{node(), node(), pong | Reason :: term()}]. +ping_pairs(Pairs) -> + %% We could use rpc:multicall(Nodes, cets_ping, ping, Args). + %% But it means more chance of nodes trying to contact each other. + ping_pairs_stop_on_pang(Pairs). + +ping_pairs_stop_on_pang([{Node1, Node2} | Pairs]) -> + F = fun() -> rpc:call(Node1, cets_ping, ping, [Node2], 10000) end, + Info = #{task => ping_node, node1 => Node1, node2 => Node2}, + Res = cets_long:run_tracked(Info, F), + case Res of + pong -> + [{Node1, Node2, pong} | ping_pairs_stop_on_pang(Pairs)]; + Other -> + %% We do not need to ping the rest of nodes - + %% one node returning pang is enough to cancel join. + %% We could exit earlier and safe some time + %% (connect_node to the dead node could be time consuming) + [{Node1, Node2, Other} | fail_pairs(Pairs, skipped)] + end; +ping_pairs_stop_on_pang([]) -> + []. + +fail_pairs(Pairs, Reason) -> + [{Node1, Node2, Reason} || {Node1, Node2} <- Pairs]. diff --git a/src/cets_status.erl b/src/cets_status.erl index 36b29e14..6b8a7865 100644 --- a/src/cets_status.erl +++ b/src/cets_status.erl @@ -64,7 +64,7 @@ gather_data(Disco) -> ThisNode = node(), Info = cets_discovery:system_info(Disco), #{tables := Tables} = Info, - OnlineNodes = [ThisNode | nodes()], + OnlineNodes = lists:sort([ThisNode | nodes()]), AvailNodes = available_nodes(Disco, OnlineNodes), Expected = get_local_table_to_other_nodes_map(Tables), OtherTabNodes = get_node_to_tab_nodes_map(AvailNodes, Disco), diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 2ea8cf76..c527354f 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -2087,8 +2087,7 @@ joining_not_fully_connected_node_is_not_allowed(Config) -> %% Pid5 and Pid3 could contact each other. %% Pid3 could contact Pid1 (they are joined). %% But Pid5 cannot contact Pid1. - {error, - {task_failed, {{nodedown, Node1}, {gen_server, call, [_, other_servers, infinity]}}, _}} = + {error, {task_failed, check_could_reach_each_other_failed, _}} = rpc(Peer5, cets_join, join, [lock_name(Config), #{}, Pid5, Pid3]), %% Still connected cets:insert(Pid1, {r1}),