Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
dumbbell committed Feb 6, 2024
1 parent 7f8727f commit 638d210
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 26 deletions.
21 changes: 5 additions & 16 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1023,27 +1023,18 @@ handle_follower(#append_entries_rpc{term = Term,
_ ->
State1 = lists:foldl(fun pre_append_log_follower/2,
State0, Entries),
%% if the cluster has changed we need to update
%% the leaderboard
Effects1 = case maps:get(cluster, State0) =/=
maps:get(cluster, State1) of
true ->
[update_leaderboard | Effects0];
false ->
Effects0
end,
case ra_log:write(Entries, Log1) of
{ok, Log2} ->
{NextState, State, Effects} =
evaluate_commit_index_follower(State1#{log => Log2},
Effects1),
Effects0),
{NextState, State,
[{next_event, {ra_log_event, flush_cache}} | Effects]};
{error, wal_down} ->
{await_condition,
State1#{log => Log1,
condition => fun wal_down_condition/2},
Effects1};
Effects0};
{error, _} = Err ->
exit(Err)
end
Expand Down Expand Up @@ -2369,9 +2360,8 @@ apply_with({Idx, Term, {'$usr', CmdMeta, Cmd, ReplyType}},
end;
apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyType}},
{Mod, _, State0, MacSt, Effects0, Notifys0, LastTs}) ->
{Effects1, Notifys} = add_reply(CmdMeta, ok, ReplyType,
Effects0, Notifys0),
Effects = [update_leaderboard | Effects1],
{Effects, Notifys} = add_reply(CmdMeta, ok, ReplyType,
Effects0, Notifys0),
State = case State0 of
#{cluster_index_term := {CI, CT}}
when Idx > CI andalso Term >= CT ->
Expand Down Expand Up @@ -2601,7 +2591,7 @@ append_cluster_change(Cluster, From, ReplyMode,
cluster := PrevCluster,
cluster_index_term := {PrevCITIdx, PrevCITTerm},
current_term := Term} = State,
Effects0) ->
Effects) ->
% turn join command into a generic cluster change command
% that include the new cluster configuration
Command = {'$ra_cluster_change', From, Cluster, ReplyMode},
Expand All @@ -2610,7 +2600,6 @@ append_cluster_change(Cluster, From, ReplyMode,
% TODO: is it safe to do change the cluster config with an async write?
% what happens if the write fails?
Log = ra_log:append({NextIdx, Term, Command}, Log0),
Effects = [update_leaderboard | Effects0],
{ok, NextIdx, Term,
State#{log => Log,
cluster => Cluster,
Expand Down
34 changes: 24 additions & 10 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ recover(_, _, State) ->
%% effects post recovery
recovered(enter, OldState, State0) ->
{State, Actions} = handle_enter(?FUNCTION_NAME, OldState, State0),
record_cluster_change(State0, State),
{keep_state, State, Actions};
recovered(internal, next, #state{server_state = ServerState} = State) ->
true = erlang:garbage_collect(),
Expand All @@ -378,7 +379,6 @@ recovered(internal, next, #state{server_state = ServerState} = State) ->

leader(enter, OldState, State0) ->
{State, Actions} = handle_enter(?FUNCTION_NAME, OldState, State0),
ok = record_leader_change(id(State0), State0),
%% TODO: reset refs?
{keep_state, State#state{leader_last_seen = undefined,
pending_notifys = #{},
Expand Down Expand Up @@ -1077,6 +1077,7 @@ handle_leader(Msg, #state{server_state = ServerState0} = State0) ->
{NextState, ServerState, Effects} ->
State = State0#state{server_state =
ra_server:persist_last_applied(ServerState)},
record_cluster_change(State0, State),
{NextState, State, Effects};
OtherErr ->
?ERR("handle_leader err ~p", [OtherErr]),
Expand All @@ -1103,8 +1104,11 @@ handle_candidate(Msg, State) ->
handle_pre_vote(Msg, State) ->
handle_raft_state(?FUNCTION_NAME, Msg, State).

handle_follower(Msg, State) ->
handle_raft_state(?FUNCTION_NAME, Msg, State).
handle_follower(Msg, State0) ->
Ret = handle_raft_state(?FUNCTION_NAME, Msg, State0),
{_NextState, State, _Effects} = Ret,
record_cluster_change(State0, State),
Ret.

handle_receive_snapshot(Msg, State) ->
handle_raft_state(?FUNCTION_NAME, Msg, State).
Expand Down Expand Up @@ -1342,7 +1346,7 @@ handle_effect(_, garbage_collection, _EvtType, State, Actions) ->
incr_counter(State#state.conf, ?C_RA_SRV_GCS, 1),
{State, Actions};
handle_effect(_, update_leaderboard, _EvtType, State, Actions) ->
ok = record_leader_change(leader_id(State), State),
%% Now a no-op. Was calling `ra_leaderboard:record/3' in the past.
{State, Actions};
handle_effect(_, {monitor, _ProcOrNode, PidOrNode}, _,
#state{monitors = Monitors} = State, Actions0) ->
Expand Down Expand Up @@ -1491,7 +1495,6 @@ follower_leader_change(Old, #state{pending_commands = Pending,
ok = aten_register(LeaderNode),
OldLeaderNode = ra_lib:ra_server_id_node(OldLeader),
_ = aten:unregister(OldLeaderNode),
ok = record_leader_change(NewLeader, New),
% leader has either changed or just been set
?INFO("~ts: detected a new leader ~w in term ~b",
[log_id(New), NewLeader, current_term(New)]),
Expand Down Expand Up @@ -1555,6 +1558,8 @@ do_state_query(voters, #{cluster := Cluster}) ->
end
end, [], Cluster),
Vs;
do_state_query(leader, #{leader := Leader}) ->
Leader;
do_state_query(members, #{cluster := Cluster}) ->
maps:keys(Cluster);
do_state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster,
Expand Down Expand Up @@ -1872,11 +1877,20 @@ handle_process_down(Pid, Info, RaftState,
monitors = Monitors}),
{keep_state, State, Actions}.

record_leader_change(Leader, #state{conf = #conf{cluster_name = ClusterName},
server_state = ServerState}) ->
Members = do_state_query(members, ServerState),
ok = ra_leaderboard:record(ClusterName, Leader, Members),
ok.
record_cluster_change(
#state{conf = #conf{cluster_name = ClusterName},
server_state = ServerStateA},
#state{server_state = ServerStateB}) ->
LeaderA = do_state_query(leader, ServerStateA),
MembersA = do_state_query(members, ServerStateA),
LeaderB = do_state_query(leader, ServerStateB),
MembersB = do_state_query(members, ServerStateB),
case {LeaderA, MembersA} of
{LeaderB, MembersB} ->
ok;
_ ->
ok = ra_leaderboard:record(ClusterName, LeaderB, MembersB)
end.

incr_counter(#conf{counter = Cnt}, Ix, N) when Cnt =/= undefined ->
counters:add(Cnt, Ix, N);
Expand Down

0 comments on commit 638d210

Please sign in to comment.