From 59711c993e349a2724fd6016443956dac0fca3b8 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 20 Dec 2024 10:41:13 +0000 Subject: [PATCH] Refactor consistent query Instead of each member maintaining a shared query_index updated by the heartbeat_rpc messages this commit changes the query_index to just maintain each member's local sequence that is only used when the member is a leader. This reduces some complexity in the algorithm as possibly avoids certain bugs. --- src/ra_server.erl | 75 +++++++++++++++++++++++----------------- test/ra_server_SUITE.erl | 23 ++++++------ 2 files changed, 54 insertions(+), 44 deletions(-) diff --git a/src/ra_server.erl b/src/ra_server.erl index 8f2199d9..bffa31cd 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -731,11 +731,13 @@ handle_leader(#heartbeat_rpc{term = Term} = Msg, Term, CurTerm]), {follower, update_term(Term, State0#{leader_id => undefined}), [{next_event, Msg}]}; -handle_leader(#heartbeat_rpc{term = Term, leader_id = LeaderId}, +handle_leader(#heartbeat_rpc{term = Term, + query_index = QueryIndex, + leader_id = LeaderId}, #{current_term := CurTerm, cfg := #cfg{id = Id}} = State) when CurTerm > Term -> - Reply = heartbeat_reply(State), + Reply = heartbeat_reply(CurTerm, QueryIndex), {leader, State, [cast_reply(Id, LeaderId, Reply)]}; handle_leader(#heartbeat_rpc{term = Term}, #{current_term := CurTerm, cfg := #cfg{log_id = LogId}}) @@ -900,9 +902,11 @@ handle_candidate(#heartbeat_rpc{term = Term} = Msg, #{current_term := CurTerm} = State0) when Term >= CurTerm -> State = update_term_and_voted_for(Term, undefined, State0), {follower, State, [{next_event, Msg}]}; -handle_candidate(#heartbeat_rpc{leader_id = LeaderId}, State) -> +handle_candidate(#heartbeat_rpc{leader_id = LeaderId, + query_index = QueryIndex}, + #{current_term := CurTerm} = State) -> % term must be older return success=false - Reply = heartbeat_reply(State), + Reply = heartbeat_reply(CurTerm, QueryIndex), {candidate, State, [cast_reply(id(State), LeaderId, Reply)]}; handle_candidate({_PeerId, #heartbeat_reply{term = Term}}, #{cfg := #cfg{log_id = LogId}, @@ -980,12 +984,14 @@ handle_pre_vote(#heartbeat_rpc{term = Term} = Msg, State = update_term(Term, State0), % revert to follower state {follower, State#{votes => 0}, [{next_event, Msg}]}; -handle_pre_vote(#heartbeat_rpc{leader_id = LeaderId}, State) -> - % term must be older return success=false - Reply = heartbeat_reply(State), +handle_pre_vote(#heartbeat_rpc{leader_id = LeaderId, + query_index = QueryIndex}, + #{current_term := CurTerm} = State) -> + Reply = heartbeat_reply(CurTerm, QueryIndex), {pre_vote, State, [cast_reply(id(State), LeaderId, Reply)]}; handle_pre_vote({_PeerId, #heartbeat_reply{term = Term}}, - #{current_term := CurTerm} = State) when Term > CurTerm -> + #{current_term := CurTerm} = State) + when Term > CurTerm -> {follower, update_term(Term, State#{votes => 0}), []}; handle_pre_vote(#request_vote_rpc{term = Term} = Msg, #{current_term := CurTerm} = State0) @@ -1181,20 +1187,21 @@ handle_follower(#append_entries_rpc{term = Term, leader_id = LeaderId}, " ~b but current term is: ~b", [LogId, LeaderId, Term, CurTerm]), {follower, State, [cast_reply(Id, LeaderId, Reply)]}; -handle_follower(#heartbeat_rpc{query_index = RpcQueryIndex, term = Term, +handle_follower(#heartbeat_rpc{query_index = QueryIndex, + term = Term, leader_id = LeaderId}, #{current_term := CurTerm, cfg := #cfg{id = Id}} = State0) when Term >= CurTerm -> State1 = update_term(Term, State0), - #{query_index := QueryIndex} = State1, - NewQueryIndex = max(RpcQueryIndex, QueryIndex), - State2 = update_query_index(State1#{leader_id => LeaderId}, NewQueryIndex), - Reply = heartbeat_reply(State2), + State2 = State1#{leader_id => LeaderId}, + Reply = heartbeat_reply(Term, QueryIndex), {follower, State2, [cast_reply(Id, LeaderId, Reply)]}; -handle_follower(#heartbeat_rpc{leader_id = LeaderId}, - #{cfg := #cfg{id = Id}} = State) -> - Reply = heartbeat_reply(State), +handle_follower(#heartbeat_rpc{leader_id = LeaderId, + query_index = QueryIndex}, + #{current_term := CurTerm, + cfg := #cfg{id = Id}} = State) -> + Reply = heartbeat_reply(CurTerm, QueryIndex), {follower, State, [cast_reply(Id, LeaderId, Reply)]}; handle_follower({ra_log_event, {written, _, _} = Evt}, State0 = #{log := Log0, @@ -2350,16 +2357,14 @@ update_term_and_voted_for(Term, VotedFor, #{cfg := #cfg{uid = UId} = Cfg, ok = ra_log_meta:store_sync(MetaName, UId, voted_for, VotedFor), incr_counter(Cfg, ?C_RA_SRV_TERM_AND_VOTED_FOR_UPDATES, 1), put_counter(Cfg, ?C_RA_SVR_METRIC_TERM, Term), + %% this is probably not necessary reset_query_index(State#{current_term => Term, voted_for => VotedFor}) end. update_term(Term, State = #{current_term := CurTerm}) when Term =/= undefined andalso Term > CurTerm -> - %% reset query index here as a new term means a new query index - %% sequence - update_term_and_voted_for(Term, undefined, - State#{query_index => 0}); + update_term_and_voted_for(Term, undefined, State); update_term(_, State) -> State. @@ -2387,8 +2392,11 @@ state_query(leader, State) -> maps:get(leader_id, State, undefined); state_query(members, #{cluster := Cluster}) -> maps:keys(Cluster); -state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster, - leader_id := Self, query_index := QI, commit_index := CI, +state_query(members_info, #{cfg := #cfg{id = Self}, + cluster := Cluster, + leader_id := Self, + query_index := QI, + commit_index := CI, membership := Membership}) -> maps:map(fun(Id, Peer) -> case {Id, Peer} of @@ -2413,8 +2421,10 @@ state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster, Peer#{voter_status => #{membership => voter}} end end, Cluster); -state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster, - query_index := QI, commit_index := CI, +state_query(members_info, #{cfg := #cfg{id = Self}, + cluster := Cluster, + query_index := QI, + commit_index := CI, membership := Membership}) -> %% Followers do not have sufficient information, %% bail out and send whatever we have. @@ -2949,7 +2959,8 @@ query_indexes(#{cfg := #cfg{id = Id}, query_index := QueryIndex}) -> maps:fold(fun (PeerId, _, Acc) when PeerId == Id -> Acc; - (_K, #{voter_status := #{membership := Membership}}, Acc) when Membership =/= voter -> + (_K, #{voter_status := #{membership := Membership}}, Acc) + when Membership =/= voter -> Acc; (_K, #{query_index := Idx}, Acc) -> [Idx | Acc] @@ -2961,7 +2972,8 @@ match_indexes(#{cfg := #cfg{id = Id}, {LWIdx, _} = ra_log:last_written(Log), maps:fold(fun (PeerId, _, Acc) when PeerId == Id -> Acc; - (_K, #{voter_status := #{membership := Membership}}, Acc) when Membership =/= voter -> + (_K, #{voter_status := #{membership := Membership}}, Acc) + when Membership =/= voter -> Acc; (_K, #{match_index := Idx}, Acc) -> [Idx | Acc] @@ -3010,7 +3022,8 @@ index_machine_version0(Idx, [{MIdx, V} | _]) index_machine_version0(Idx, [_ | Rem]) -> index_machine_version0(Idx, Rem). -heartbeat_reply(#{current_term := CurTerm, query_index := QueryIndex}) -> +heartbeat_reply(CurTerm, QueryIndex) + when is_integer(CurTerm) andalso is_integer(QueryIndex) -> #heartbeat_reply{term = CurTerm, query_index = QueryIndex}. update_heartbeat_rpc_effects(#{query_index := QueryIndex, @@ -3045,15 +3058,12 @@ make_heartbeat_rpc_effects(QueryRef, {State0, Effects}; _ -> NewQueryIndex = QueryIndex + 1, - State = update_query_index(State0, NewQueryIndex), + State = State0#{query_index => NewQueryIndex}, Effects = heartbeat_rpc_effects(Peers, Id, Term, NewQueryIndex), Waiting1 = queue:in({NewQueryIndex, QueryRef}, Waiting0), {State#{queries_waiting_heartbeats => Waiting1}, Effects} end. -update_query_index(State, NewQueryIndex) -> - State#{query_index => NewQueryIndex}. - reset_query_index(#{cluster := Cluster} = State) -> State#{cluster => maps:map(fun(_PeerId, Peer) -> Peer#{query_index => 0} @@ -3114,7 +3124,8 @@ update_peer_query_index(PeerId, QueryIndex, #{cluster := Cluster} = State0) -> get_current_query_quorum(State) -> agreed_commit(query_indexes(State)). --spec take_from_queue_while(fun((El) -> {true, Res} | false), queue:queue(El)) -> +-spec take_from_queue_while(fun((El) -> {true, Res} | false), + queue:queue(El)) -> {[Res], queue:queue(El)}. take_from_queue_while(Fun, Queue) -> take_from_queue_while(Fun, Queue, []). diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index ad0450b2..a17ba982 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -2332,12 +2332,12 @@ follower_heartbeat(_Config) -> {follower, State, [{cast, LeaderId, {Id, #heartbeat_reply{term = Term, - query_index = QIndex}}}]} + query_index = NewQueryIndex}}}]} = ra_server:handle_follower(Heartbeat#heartbeat_rpc{term = LowerTerm}, State), - %% Reply to the same term. Update query index + %% Reply to the same term {follower, - #{query_index := NewQueryIndex}, + State, [{cast, LeaderId, {Id, #heartbeat_reply{term = Term, @@ -2347,8 +2347,7 @@ follower_heartbeat(_Config) -> %% Reply to a higher term. Update follower term. NewTerm = Term + 1, {follower, - #{query_index := NewQueryIndex, - current_term := NewTerm, + #{current_term := NewTerm, voted_for := undefined}, [{cast, LeaderId, @@ -2405,7 +2404,7 @@ candidate_heartbeat(_Config) -> [{cast, LeaderId, {Id, #heartbeat_reply{term = Term, - query_index = QueryIndex}}}]} + query_index = NewQueryIndex}}}]} = ra_server:handle_candidate(HeartbeatLowTerm, State). candidate_heartbeat_reply(_Config) -> @@ -2457,7 +2456,7 @@ pre_vote_heartbeat(_Config) -> {pre_vote, State, [{cast, LeaderId, {Id, #heartbeat_reply{term = Term, - query_index = QueryIndex}}}]} + query_index = NewQueryIndex}}}]} = ra_server:handle_pre_vote(Heartbeat#heartbeat_rpc{term = LowTerm}, State). @@ -2510,10 +2509,10 @@ leader_heartbeat(_Config) -> NewTerm = Term + 1, HeartbeatHigherTerm = Heartbeat#heartbeat_rpc{term = NewTerm}, StateWithHigherTerm = set_peer_query_index( - State#{current_term => NewTerm, - leader_id => undefined, - voted_for => undefined}, - Id, 0), + State#{current_term => NewTerm, + leader_id => undefined, + voted_for => undefined}, + Id, 0), {follower, StateWithHigherTerm, [{next_event, HeartbeatHigherTerm}]} = ra_server:handle_leader(HeartbeatHigherTerm, State), @@ -2524,7 +2523,7 @@ leader_heartbeat(_Config) -> [{cast, LeaderId, {Id, #heartbeat_reply{term = Term, - query_index = QueryIndex}}}]} + query_index = NewQueryIndex}}}]} = ra_server:handle_leader(HeartbeatLowTerm, State). leader_heartbeat_reply_node_size_5(_Config) ->