Skip to content

Commit

Permalink
Merge pull request #491 from rabbitmq/consistent-query-refactor
Browse files Browse the repository at this point in the history
Refactor consistent query
  • Loading branch information
kjnilsson authored Dec 20, 2024
2 parents 17b7290 + 59711c9 commit dce73f0
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 44 deletions.
75 changes: 43 additions & 32 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -731,11 +731,13 @@ handle_leader(#heartbeat_rpc{term = Term} = Msg,
Term, CurTerm]),
{follower, update_term(Term, State0#{leader_id => undefined}),
[{next_event, Msg}]};
handle_leader(#heartbeat_rpc{term = Term, leader_id = LeaderId},
handle_leader(#heartbeat_rpc{term = Term,
query_index = QueryIndex,
leader_id = LeaderId},
#{current_term := CurTerm,
cfg := #cfg{id = Id}} = State)
when CurTerm > Term ->
Reply = heartbeat_reply(State),
Reply = heartbeat_reply(CurTerm, QueryIndex),
{leader, State, [cast_reply(Id, LeaderId, Reply)]};
handle_leader(#heartbeat_rpc{term = Term},
#{current_term := CurTerm, cfg := #cfg{log_id = LogId}})
Expand Down Expand Up @@ -900,9 +902,11 @@ handle_candidate(#heartbeat_rpc{term = Term} = Msg,
#{current_term := CurTerm} = State0) when Term >= CurTerm ->
State = update_term_and_voted_for(Term, undefined, State0),
{follower, State, [{next_event, Msg}]};
handle_candidate(#heartbeat_rpc{leader_id = LeaderId}, State) ->
handle_candidate(#heartbeat_rpc{leader_id = LeaderId,
query_index = QueryIndex},
#{current_term := CurTerm} = State) ->
% term must be older return success=false
Reply = heartbeat_reply(State),
Reply = heartbeat_reply(CurTerm, QueryIndex),
{candidate, State, [cast_reply(id(State), LeaderId, Reply)]};
handle_candidate({_PeerId, #heartbeat_reply{term = Term}},
#{cfg := #cfg{log_id = LogId},
Expand Down Expand Up @@ -980,12 +984,14 @@ handle_pre_vote(#heartbeat_rpc{term = Term} = Msg,
State = update_term(Term, State0),
% revert to follower state
{follower, State#{votes => 0}, [{next_event, Msg}]};
handle_pre_vote(#heartbeat_rpc{leader_id = LeaderId}, State) ->
% term must be older return success=false
Reply = heartbeat_reply(State),
handle_pre_vote(#heartbeat_rpc{leader_id = LeaderId,
query_index = QueryIndex},
#{current_term := CurTerm} = State) ->
Reply = heartbeat_reply(CurTerm, QueryIndex),
{pre_vote, State, [cast_reply(id(State), LeaderId, Reply)]};
handle_pre_vote({_PeerId, #heartbeat_reply{term = Term}},
#{current_term := CurTerm} = State) when Term > CurTerm ->
#{current_term := CurTerm} = State)
when Term > CurTerm ->
{follower, update_term(Term, State#{votes => 0}), []};
handle_pre_vote(#request_vote_rpc{term = Term} = Msg,
#{current_term := CurTerm} = State0)
Expand Down Expand Up @@ -1181,20 +1187,21 @@ handle_follower(#append_entries_rpc{term = Term, leader_id = LeaderId},
" ~b but current term is: ~b",
[LogId, LeaderId, Term, CurTerm]),
{follower, State, [cast_reply(Id, LeaderId, Reply)]};
handle_follower(#heartbeat_rpc{query_index = RpcQueryIndex, term = Term,
handle_follower(#heartbeat_rpc{query_index = QueryIndex,
term = Term,
leader_id = LeaderId},
#{current_term := CurTerm,
cfg := #cfg{id = Id}} = State0)
when Term >= CurTerm ->
State1 = update_term(Term, State0),
#{query_index := QueryIndex} = State1,
NewQueryIndex = max(RpcQueryIndex, QueryIndex),
State2 = update_query_index(State1#{leader_id => LeaderId}, NewQueryIndex),
Reply = heartbeat_reply(State2),
State2 = State1#{leader_id => LeaderId},
Reply = heartbeat_reply(Term, QueryIndex),
{follower, State2, [cast_reply(Id, LeaderId, Reply)]};
handle_follower(#heartbeat_rpc{leader_id = LeaderId},
#{cfg := #cfg{id = Id}} = State) ->
Reply = heartbeat_reply(State),
handle_follower(#heartbeat_rpc{leader_id = LeaderId,
query_index = QueryIndex},
#{current_term := CurTerm,
cfg := #cfg{id = Id}} = State) ->
Reply = heartbeat_reply(CurTerm, QueryIndex),
{follower, State, [cast_reply(Id, LeaderId, Reply)]};
handle_follower({ra_log_event, {written, _, _} = Evt},
State0 = #{log := Log0,
Expand Down Expand Up @@ -2350,16 +2357,14 @@ update_term_and_voted_for(Term, VotedFor, #{cfg := #cfg{uid = UId} = Cfg,
ok = ra_log_meta:store_sync(MetaName, UId, voted_for, VotedFor),
incr_counter(Cfg, ?C_RA_SRV_TERM_AND_VOTED_FOR_UPDATES, 1),
put_counter(Cfg, ?C_RA_SVR_METRIC_TERM, Term),
%% this is probably not necessary
reset_query_index(State#{current_term => Term,
voted_for => VotedFor})
end.

update_term(Term, State = #{current_term := CurTerm})
when Term =/= undefined andalso Term > CurTerm ->
%% reset query index here as a new term means a new query index
%% sequence
update_term_and_voted_for(Term, undefined,
State#{query_index => 0});
update_term_and_voted_for(Term, undefined, State);
update_term(_, State) ->
State.

Expand Down Expand Up @@ -2387,8 +2392,11 @@ state_query(leader, State) ->
maps:get(leader_id, State, undefined);
state_query(members, #{cluster := Cluster}) ->
maps:keys(Cluster);
state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster,
leader_id := Self, query_index := QI, commit_index := CI,
state_query(members_info, #{cfg := #cfg{id = Self},
cluster := Cluster,
leader_id := Self,
query_index := QI,
commit_index := CI,
membership := Membership}) ->
maps:map(fun(Id, Peer) ->
case {Id, Peer} of
Expand All @@ -2413,8 +2421,10 @@ state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster,
Peer#{voter_status => #{membership => voter}}
end
end, Cluster);
state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster,
query_index := QI, commit_index := CI,
state_query(members_info, #{cfg := #cfg{id = Self},
cluster := Cluster,
query_index := QI,
commit_index := CI,
membership := Membership}) ->
%% Followers do not have sufficient information,
%% bail out and send whatever we have.
Expand Down Expand Up @@ -2949,7 +2959,8 @@ query_indexes(#{cfg := #cfg{id = Id},
query_index := QueryIndex}) ->
maps:fold(fun (PeerId, _, Acc) when PeerId == Id ->
Acc;
(_K, #{voter_status := #{membership := Membership}}, Acc) when Membership =/= voter ->
(_K, #{voter_status := #{membership := Membership}}, Acc)
when Membership =/= voter ->
Acc;
(_K, #{query_index := Idx}, Acc) ->
[Idx | Acc]
Expand All @@ -2961,7 +2972,8 @@ match_indexes(#{cfg := #cfg{id = Id},
{LWIdx, _} = ra_log:last_written(Log),
maps:fold(fun (PeerId, _, Acc) when PeerId == Id ->
Acc;
(_K, #{voter_status := #{membership := Membership}}, Acc) when Membership =/= voter ->
(_K, #{voter_status := #{membership := Membership}}, Acc)
when Membership =/= voter ->
Acc;
(_K, #{match_index := Idx}, Acc) ->
[Idx | Acc]
Expand Down Expand Up @@ -3010,7 +3022,8 @@ index_machine_version0(Idx, [{MIdx, V} | _])
index_machine_version0(Idx, [_ | Rem]) ->
index_machine_version0(Idx, Rem).

heartbeat_reply(#{current_term := CurTerm, query_index := QueryIndex}) ->
heartbeat_reply(CurTerm, QueryIndex)
when is_integer(CurTerm) andalso is_integer(QueryIndex) ->
#heartbeat_reply{term = CurTerm, query_index = QueryIndex}.

update_heartbeat_rpc_effects(#{query_index := QueryIndex,
Expand Down Expand Up @@ -3045,15 +3058,12 @@ make_heartbeat_rpc_effects(QueryRef,
{State0, Effects};
_ ->
NewQueryIndex = QueryIndex + 1,
State = update_query_index(State0, NewQueryIndex),
State = State0#{query_index => NewQueryIndex},
Effects = heartbeat_rpc_effects(Peers, Id, Term, NewQueryIndex),
Waiting1 = queue:in({NewQueryIndex, QueryRef}, Waiting0),
{State#{queries_waiting_heartbeats => Waiting1}, Effects}
end.

update_query_index(State, NewQueryIndex) ->
State#{query_index => NewQueryIndex}.

reset_query_index(#{cluster := Cluster} = State) ->
State#{cluster => maps:map(fun(_PeerId, Peer) ->
Peer#{query_index => 0}
Expand Down Expand Up @@ -3114,7 +3124,8 @@ update_peer_query_index(PeerId, QueryIndex, #{cluster := Cluster} = State0) ->
get_current_query_quorum(State) ->
agreed_commit(query_indexes(State)).

-spec take_from_queue_while(fun((El) -> {true, Res} | false), queue:queue(El)) ->
-spec take_from_queue_while(fun((El) -> {true, Res} | false),
queue:queue(El)) ->
{[Res], queue:queue(El)}.
take_from_queue_while(Fun, Queue) ->
take_from_queue_while(Fun, Queue, []).
Expand Down
23 changes: 11 additions & 12 deletions test/ra_server_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2332,12 +2332,12 @@ follower_heartbeat(_Config) ->
{follower,
State,
[{cast, LeaderId, {Id, #heartbeat_reply{term = Term,
query_index = QIndex}}}]}
query_index = NewQueryIndex}}}]}
= ra_server:handle_follower(Heartbeat#heartbeat_rpc{term = LowerTerm}, State),

%% Reply to the same term. Update query index
%% Reply to the same term
{follower,
#{query_index := NewQueryIndex},
State,
[{cast,
LeaderId,
{Id, #heartbeat_reply{term = Term,
Expand All @@ -2347,8 +2347,7 @@ follower_heartbeat(_Config) ->
%% Reply to a higher term. Update follower term.
NewTerm = Term + 1,
{follower,
#{query_index := NewQueryIndex,
current_term := NewTerm,
#{current_term := NewTerm,
voted_for := undefined},
[{cast,
LeaderId,
Expand Down Expand Up @@ -2405,7 +2404,7 @@ candidate_heartbeat(_Config) ->
[{cast,
LeaderId,
{Id, #heartbeat_reply{term = Term,
query_index = QueryIndex}}}]}
query_index = NewQueryIndex}}}]}
= ra_server:handle_candidate(HeartbeatLowTerm, State).

candidate_heartbeat_reply(_Config) ->
Expand Down Expand Up @@ -2457,7 +2456,7 @@ pre_vote_heartbeat(_Config) ->
{pre_vote, State,
[{cast, LeaderId,
{Id, #heartbeat_reply{term = Term,
query_index = QueryIndex}}}]}
query_index = NewQueryIndex}}}]}
= ra_server:handle_pre_vote(Heartbeat#heartbeat_rpc{term = LowTerm},
State).

Expand Down Expand Up @@ -2510,10 +2509,10 @@ leader_heartbeat(_Config) ->
NewTerm = Term + 1,
HeartbeatHigherTerm = Heartbeat#heartbeat_rpc{term = NewTerm},
StateWithHigherTerm = set_peer_query_index(
State#{current_term => NewTerm,
leader_id => undefined,
voted_for => undefined},
Id, 0),
State#{current_term => NewTerm,
leader_id => undefined,
voted_for => undefined},
Id, 0),
{follower, StateWithHigherTerm, [{next_event, HeartbeatHigherTerm}]}
= ra_server:handle_leader(HeartbeatHigherTerm, State),

Expand All @@ -2524,7 +2523,7 @@ leader_heartbeat(_Config) ->
[{cast,
LeaderId,
{Id, #heartbeat_reply{term = Term,
query_index = QueryIndex}}}]}
query_index = NewQueryIndex}}}]}
= ra_server:handle_leader(HeartbeatLowTerm, State).

leader_heartbeat_reply_node_size_5(_Config) ->
Expand Down

0 comments on commit dce73f0

Please sign in to comment.