Skip to content

Commit

Permalink
Add configuration setting to restore old machine upgrade behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
dumbbell committed Jan 7, 2025
1 parent bb02234 commit 1057a7d
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 7 deletions.
29 changes: 24 additions & 5 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,18 @@
%% The simple machine config is version that can only be used for simple state
%% machines that cannot access any of the advanced features.

-type upgrade_machine_when() :: after_election | when_available_everywhere.
%% When a new Ra machine version should be upgraded to.
%%
%% <ul>
%% <li>`after_election': when a leader is elected and if it supports a newer
%% machine version, it switches to that new version. This was the default and
%% only possible behavior in Ra up to 2.15.</li>
%% <li>`when_available_everywhere': a new machine version it used only when
%% all members of the cluster support it. This is the default behavior
%% starting with Ra 2.16.</li>
%% </ul>

-type ra_server_config() :: #{id := ra_server_id(),
uid := ra_uid(),
%% a friendly name to refer to a particular
Expand Down Expand Up @@ -271,7 +283,8 @@
command_reply_mode/0,
ra_event_formatter_fun/0,
effect/0,
effects/0
effects/0,
upgrade_machine_when/0
]).

-spec name(ClusterName :: ra_cluster_name(), UniqueSuffix::string()) -> atom().
Expand Down Expand Up @@ -3445,19 +3458,25 @@ after_log_append_reply(Cmd, Idx, Term, Effects0) ->

post_election_effects(
#{cfg := #cfg{effective_machine_version = EffectiveMacVer,
machine = Mac}} = State) ->
machine = Mac,
system_config = #{upgrade_machine_when := UpgradeMachineWhen}
}} = State) ->
Peers = peers(State),
PeerIds = maps:keys(Peers),
case PeerIds of
[] ->

UpgradeMachineNow = (PeerIds =:= [] orelse
UpgradeMachineWhen =:= after_election),

case UpgradeMachineNow of
true ->
%% This node is alone in the cluster, we can send the `noop'
%% command with the newer machine version right away.
MacVer = ra_machine:version(Mac),
Noop = {noop, #{ts => erlang:system_time(millisecond)},
MacVer},
NoopEffect = {next_event, cast, {command, Noop}},
[NoopEffect];
_ ->
false ->
%% We continue to send the `noop' command immediately after
%% becoming a leader, but compared to Ra 2.15 and older, we don't
%% set the machine version to the latest: we keep the same
Expand Down
1 change: 1 addition & 0 deletions src/ra_server.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
-define(DEFAULT_MAX_PIPELINE_COUNT, 4096).
-define(DEFAULT_SNAPSHOT_CHUNK_SIZE, 1000000). % 1MB
-define(DEFAULT_RECEIVE_SNAPSHOT_TIMEOUT, 30000).
-define(DEFAULT_UPGRADE_MACHINE_WHEN, when_available_everywhere).
-define(FLUSH_COMMANDS_SIZE, 16).

-record(cfg,
Expand Down
6 changes: 5 additions & 1 deletion src/ra_system.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@
low_priority_commands_in_memory_size => non_neg_integer(),
server_recovery_strategy => undefined |
registered |
{module(), atom(), list()}
{module(), atom(), list()},
upgrade_machine_when => ra_server:upgrade_machine_when()
}.

-export_type([
Expand Down Expand Up @@ -116,6 +117,8 @@ default_config() ->
?FLUSH_COMMANDS_SIZE),
LowPriorityInMemSize = application:get_env(ra, low_priority_commands_in_memory_size,
?FLUSH_COMMANDS_SIZE),
UpgradeMachineWhen = application:get_env(ra, upgrade_machine_when,
?DEFAULT_UPGRADE_MACHINE_WHEN),
#{name => default,
data_dir => DataDir,
wal_data_dir => WalDataDir,
Expand All @@ -142,6 +145,7 @@ default_config() ->
receive_snapshot_timeout => ReceiveSnapshotTimeout,
low_priority_commands_flush_size => LowPriorityCommandsFlushSize,
low_priority_commands_in_memory_size => LowPriorityInMemSize,
upgrade_machine_when => UpgradeMachineWhen,
names => #{wal => ra_log_wal,
wal_sup => ra_log_wal_sup,
log_sup => ra_log_sup,
Expand Down
79 changes: 78 additions & 1 deletion test/ra_machine_version_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ all_tests() ->
[
server_with_higher_version_needs_quorum_to_be_elected,
cluster_waits_for_all_members_to_have_latest_version_to_upgrade,
server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher,
unversioned_machine_never_sees_machine_version_command,
unversioned_can_change_to_versioned,
server_upgrades_machine_state_on_noop_command,
Expand Down Expand Up @@ -64,6 +65,14 @@ end_per_group(_Group, _Config) ->
init_per_testcase(TestCase, Config) ->
ok = logger:set_primary_config(level, all),
ra_server_sup_sup:remove_all(?SYS),
case TestCase of
server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher ->
ok = application:set_env(ra, upgrade_machine_when, after_election),
_ = ra_system:stop_default(),
{ok, _} = ra_system:start_default();
_ ->
ok
end,
ServerName1 = list_to_atom(atom_to_list(TestCase) ++ "1"),
ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"),
ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"),
Expand All @@ -82,9 +91,17 @@ init_per_testcase(TestCase, Config) ->
{server_id3, {ServerName3, node()}}
| Config].

end_per_testcase(_TestCase, Config) ->
end_per_testcase(TestCase, Config) ->
catch ra:delete_cluster(?config(cluster, Config)),
meck:unload(),
case TestCase of
server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher ->
ok = application:unset_env(ra, upgrade_machine_when),
_ = ra_system:stop_default(),
{ok, _} = ra_system:start_default();
_ ->
ok
end,
ok.

%%%===================================================================
Expand Down Expand Up @@ -206,6 +223,66 @@ cluster_waits_for_all_members_to_have_latest_version_to_upgrade(Config) ->

ok.

server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher(Config) ->
ok = ra_env:configure_logger(logger),
LogFile = filename:join([?config(priv_dir, Config), "ra.log"]),
LogConfig = #{config => #{type => {file, LogFile}}, level => debug},
logger:add_handler(ra_handler, logger_std_h, LogConfig),
ok = logger:set_primary_config(level, all),
ct:pal("handler config ~p", [logger:get_handler_config()]),
Mod = ?config(modname, Config),
meck:new(Mod, [non_strict]),
meck:expect(Mod, init, fun (_) -> init_state end),
meck:expect(Mod, version, fun () -> 1 end),
meck:expect(Mod, which_module, fun (_) -> Mod end),
meck:expect(Mod, apply, fun (_, _, S) -> {S, ok} end),
Cluster = ?config(cluster, Config),
ClusterName = ?config(cluster_name, Config),
Leader = start_cluster(ClusterName, {module, Mod, #{}}, Cluster),
[Follower1, Follower2] = lists:delete(Leader, Cluster),
timer:sleep(100),
%% leader and follower 1 are v2s
ra:stop_server(?SYS, Leader),
ra:stop_server(?SYS, Follower1),
ra:stop_server(?SYS, Follower2),
meck:expect(Mod, version, fun () ->
New = [whereis(element(1, Leader)),
whereis(element(1, Follower1))],
case lists:member(self(), New) of
true -> 2;
_ -> 1
end
end),
ra:restart_server(?SYS, Leader),
ra:restart_server(?SYS, Follower1),
timer:sleep(100),
{ok, _, Leader2} = ra:members(Leader, 2000),
ra:restart_server(?SYS, Follower2),
%% need to wait until the restarted Follower2 discovers the current
%% effective machine version
await(fun () ->
case ra:member_overview(Follower2) of
{ok, #{effective_machine_version := 2,
machine_version := 1}, _} ->
true;
_ ->
false
end
end, 100),
%% at this point the effective machine version known by all members is 2
%% but Follower2's local machine version is 1 as it hasn't been "upgraded"
%% yet
%% stop the leader to trigger an election that Follower2 must not win
ra:stop_server(?SYS, Leader2),
ExpectedLeader = case Leader2 of
Follower1 -> Leader;
_ -> Follower1
end,
%% follower 1 should now be elected
?assertMatch({ok, _, ExpectedLeader}, ra:members(ExpectedLeader, 60000)),

ok.

unversioned_machine_never_sees_machine_version_command(Config) ->
Mod = ?config(modname, Config),
meck:new(Mod, [non_strict]),
Expand Down

0 comments on commit 1057a7d

Please sign in to comment.