Skip to content

Commit

Permalink
Introduce initial_machine_version server config.
Browse files Browse the repository at this point in the history
This new key can be used to specify the initial machine version
a new Ra server should be initialised against.

This allows machines to skip old versions and clean up old
code.

If the machine_version_strategy is all starting a server with
an initial machine version that is higher than the locally available
machine version will result in an error: {error, invalid_initial_machine_version}.

When machine_version_strategy=quorum the initial machine version
will be clamped to the locally available machine version.
  • Loading branch information
kjnilsson committed Jan 10, 2025
1 parent 5997bdf commit 0559967
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 30 deletions.
1 change: 1 addition & 0 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,7 @@ write_config(Config0, #?MODULE{cfg = #cfg{directory = Dir}}) ->
Config = maps:without([parent,
counter,
has_changed,
initial_machine_version,
%% don't write system config to disk as it will
%% be updated each time
system_config], Config0),
Expand Down
20 changes: 9 additions & 11 deletions src/ra_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@

-include("ra.hrl").

-export([init/2,
-export([init/3,
apply/4,
tick/3,
snapshot_installed/5,
Expand All @@ -90,7 +90,9 @@
-type user_command() :: term().
%% the command type for a given machine implementation

-type machine_init_args() :: #{name := atom(), atom() => term()}.
-type machine_init_args() :: #{name := atom(),
machine_version := version(),
atom() => term()}.
%% the configuration passed to the init callback

-type machine() :: {machine, module(), AddInitArgs :: #{term() => term()}}.
Expand Down Expand Up @@ -294,15 +296,11 @@
%% @doc initialise a new machine
%% This is only called on startup only if there isn't yet a snapshot to recover
%% from. Once a snapshot has been taken this is never called again.
-spec init(machine(), atom()) -> state().
init({machine, _, Args} = Machine, Name) ->
%% init always dispatches to the first version
%% as this means every state machine in a mixed version cluster will
%% have a common starting point.
%% TODO: it should be possible to pass a lowest supported state machine
%% version flag in the init args so that old machine version can be purged
Mod = which_module(Machine, 0),
Mod:init(Args#{name => Name}).
-spec init(machine(), atom(), version()) -> state().
init({machine, _, Args} = Machine, Name, Version) ->
Mod = which_module(Machine, Version),
Mod:init(Args#{name => Name,
machine_version => Version}).

-spec apply(module(), command_meta_data(), command(), State) ->
{State, reply(), effects()} | {State, reply()}.
Expand Down
23 changes: 13 additions & 10 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@
log_init_args := ra_log:ra_log_init_args(),
initial_members := [ra_server_id()],
machine := machine_conf(),
initial_machine_version => ra_machine:version(),
friendly_name => unicode:chardata(),
metrics_key => term(),
% TODO: review - only really used for
Expand Down Expand Up @@ -352,24 +353,26 @@ init(#{id := Id,
VotedFor = ra_log_meta:fetch(MetaName, UId, voted_for, undefined),

LatestMacVer = ra_machine:version(Machine),
InitialMachineVersion = min(LatestMacVer,
maps:get(initial_machine_version, Config, 0)),

{_FirstIndex, Cluster0, MacVer, MacState,
{Cluster0, EffectiveMacVer, MacState,
{SnapshotIdx, _} = SnapshotIndexTerm} =
case ra_log:recover_snapshot(Log0) of
undefined ->
InitialMachineState = ra_machine:init(Machine, Name),
{0, make_cluster(Id, InitialNodes),
0, InitialMachineState, {0, 0}};
InitialMachineState = ra_machine:init(Machine, Name,
InitialMachineVersion),
{make_cluster(Id, InitialNodes),
InitialMachineVersion, InitialMachineState, {0, 0}};
{#{index := Idx,
term := Term,
cluster := ClusterNodes,
machine_version := MacVersion}, MacSt} ->
Clu = make_cluster(Id, ClusterNodes),
%% the snapshot is the last index before the first index
%% TODO: should this be Idx + 1?
{Idx + 1, Clu, MacVersion, MacSt, {Idx, Term}}
{Clu, MacVersion, MacSt, {Idx, Term}}
end,
MacMod = ra_machine:which_module(Machine, MacVer),
MacMod = ra_machine:which_module(Machine, EffectiveMacVer),

CommitIndex = max(LastApplied, SnapshotIdx),
Cfg = #cfg{id = Id,
Expand All @@ -378,8 +381,8 @@ init(#{id := Id,
metrics_key = MetricKey,
machine = Machine,
machine_version = LatestMacVer,
machine_versions = [{SnapshotIdx, MacVer}],
effective_machine_version = MacVer,
machine_versions = [{SnapshotIdx, EffectiveMacVer}],
effective_machine_version = EffectiveMacVer,
effective_machine_module = MacMod,
effective_handle_aux_fun = ra_machine:which_aux_fun(MacMod),
max_pipeline_count = MaxPipelineCount,
Expand All @@ -389,7 +392,7 @@ init(#{id := Id,
put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_INDEX, CommitIndex),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, SnapshotIdx),
put_counter(Cfg, ?C_RA_SVR_METRIC_TERM, CurrentTerm),
put_counter(Cfg, ?C_RA_SVR_METRIC_EFFECTIVE_MACHINE_VERSION, MacVer),
put_counter(Cfg, ?C_RA_SVR_METRIC_EFFECTIVE_MACHINE_VERSION, EffectiveMacVer),

NonVoter = get_membership(Cluster0, Id, UId,
maps:get(membership, Config, voter)),
Expand Down
31 changes: 27 additions & 4 deletions src/ra_server_sup_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
-include("ra.hrl").

-spec start_server(System :: atom(), ra_server:ra_server_config()) ->
supervisor:startchild_ret() | {error, not_new | system_not_started} | {badrpc, term()}.
supervisor:startchild_ret() |
{error, not_new | system_not_started | invalid_initial_machine_version} |
{badrpc, term()}.
start_server(System, #{id := NodeId,
uid := UId} = Config)
when is_atom(System) ->
Expand All @@ -61,9 +63,14 @@ start_server_rpc(System, UId, Config0) ->
%% check that the server isn't already registered
case ra_directory:name_of(System, UId) of
undefined ->
case ra_system:lookup_name(System, server_sup) of
{ok, Name} ->
start_child(Name, Config);
case validate_config(Config) of
ok ->
case ra_system:lookup_name(System, server_sup) of
{ok, Name} ->
start_child(Name, Config);
Err ->
Err
end;
Err ->
Err
end;
Expand All @@ -77,6 +84,22 @@ start_server_rpc(System, UId, Config0) ->
end
end.

validate_config(#{system_config := SysConf} = Config) ->
Strat = maps:get(machine_upgrade_strategy, SysConf, all),
case Config of
#{initial_machine_version := InitMacVer,
machine := {module, Mod, Args}} when Strat == all ->
MacVer = ra_machine:version({machine, Mod, Args}),
if MacVer < InitMacVer ->
{error, invalid_initial_machine_version};
true ->
ok
end;
_ ->
ok
end.


restart_server_rpc(System, {RaName, _Node}, AddConfig)
when is_atom(System) ->
case ra_system:fetch(System) of
Expand Down
162 changes: 157 additions & 5 deletions test/ra_machine_version_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ all_tests() ->
unversioned_machine_never_sees_machine_version_command,
unversioned_can_change_to_versioned,
server_upgrades_machine_state_on_noop_command,
server_applies_with_new_module
server_applies_with_new_module,
initial_machine_version,
initial_machine_version_quorum
% snapshot_persists_machine_version
].

Expand Down Expand Up @@ -65,8 +67,8 @@ end_per_group(_Group, _Config) ->
init_per_testcase(TestCase, Config) ->
ok = logger:set_primary_config(level, all),
ra_server_sup_sup:remove_all(?SYS),
case TestCase of
server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher ->
case lists:member(TestCase, machine_upgrade_quorum_tests()) of
true ->
ok = application:set_env(ra, machine_upgrade_strategy, quorum),
_ = ra_system:stop_default(),
{ok, _} = ra_system:start_default();
Expand Down Expand Up @@ -94,8 +96,8 @@ init_per_testcase(TestCase, Config) ->
end_per_testcase(TestCase, Config) ->
catch ra:delete_cluster(?config(cluster, Config)),
meck:unload(),
case TestCase of
server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher ->
case lists:member(TestCase, machine_upgrade_quorum_tests()) of
true ->
ok = application:unset_env(ra, machine_upgrade_strategy),
_ = ra_system:stop_default(),
{ok, _} = ra_system:start_default();
Expand All @@ -104,6 +106,10 @@ end_per_testcase(TestCase, Config) ->
end,
ok.

machine_upgrade_quorum_tests() ->
[server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher,
initial_machine_version_quorum].

%%%===================================================================
%%% Test cases
%%%===================================================================
Expand Down Expand Up @@ -427,6 +433,152 @@ server_applies_with_new_module(Config) ->
snapshot_persists_machine_version(_Config) ->
error({todo, ?FUNCTION_NAME}).

initial_machine_version(Config) ->
Mod = ?config(modname, Config),
meck:new(Mod, [non_strict]),
meck:expect(Mod, init, fun (#{machine_version := MacVer}) ->
?assertEqual(3, MacVer),
init_state
end),
meck:expect(Mod, version, fun () -> 5 end),
meck:expect(Mod, which_module, fun (_) -> Mod end),
meck:expect(Mod, apply, fun (_, dummy, S) ->
{S, ok};
(_, {machine_version, 0, 3}, init_state) ->
exit(booo),
{state_v3, ok};
(_, {machine_version, 3, 5}, init_state) ->
ct:pal("3-5"),
{state_v5, ok}
end),
ClusterName = ?config(cluster_name, Config),
ServerId = ?config(server_id, Config),
Machine = {module, Mod, #{}},
Configs = [begin
UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
#{id => Id,
uid => UId,
cluster_name => ClusterName,
log_init_args => #{uid => UId},
initial_members => [ServerId],
machine => Machine,
initial_machine_version => 3}
end || Id <- [ServerId]],
% debugger:start(),
% int:i(ra_machine),
% int:i(ra_server_sup_sup),
% int:break(ra_server_sup_sup, 66),
{ok, _, _} = ra:start_cluster(?SYS, Configs, 5000),
await(fun () ->
{ok, {_, S}, _} = ra:leader_query(ServerId, fun ra_lib:id/1),
S == state_v5
end, 100),
?assertMatch({ok, #{effective_machine_version := 5}, _},
ra:member_overview(ServerId)),
{ok, _} = ra:delete_cluster([ServerId]),
await(fun () -> whereis(element(1, ServerId)) == undefined end, 100),
meck:expect(Mod, init, fun (#{machine_version := MacVer}) ->
?assertEqual(5, MacVer),
init_state
end),
meck:expect(Mod, apply, fun (Meta, meta, _S) ->
{state_v5, Meta};
(_, {machine_version, 0, 3}, init_state) ->
exit(booo),
{state_v3, ok};
(_, {machine_version, 5, 5}, init_state) ->
ct:pal("5-5"),
{state_v5, ok}
end),
Configs2 = [begin
UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
#{id => Id,
uid => UId,
cluster_name => ClusterName,
log_init_args => #{uid => UId},
initial_members => [ServerId],
machine => Machine,
initial_machine_version => 9}
end || Id <- [ServerId]],
{error, cluster_not_formed} = ra:start_cluster(?SYS, Configs2, 5000),
ok.

initial_machine_version_quorum(Config) ->
Mod = ?config(modname, Config),
meck:new(Mod, [non_strict]),
meck:expect(Mod, init, fun (#{machine_version := MacVer}) ->
?assertEqual(3, MacVer),
init_state
end),
meck:expect(Mod, version, fun () -> 5 end),
meck:expect(Mod, which_module, fun (_) -> Mod end),
meck:expect(Mod, apply, fun (_, dummy, S) ->
{S, ok};
(_, {machine_version, 0, 3}, init_state) ->
exit(booo),
{state_v3, ok};
(_, {machine_version, 3, 5}, init_state) ->
ct:pal("3-5"),
{state_v5, ok}
end),
ClusterName = ?config(cluster_name, Config),
ServerId = ?config(server_id, Config),
Machine = {module, Mod, #{}},
Configs = [begin
UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
#{id => Id,
uid => UId,
cluster_name => ClusterName,
log_init_args => #{uid => UId},
initial_members => [ServerId],
machine => Machine,
initial_machine_version => 3}
end || Id <- [ServerId]],
% debugger:start(),
% int:i(ra_machine),
% int:i(ra_server_sup_sup),
% int:break(ra_server_sup_sup, 66),
{ok, _, _} = ra:start_cluster(?SYS, Configs, 5000),
await(fun () ->
{ok, {_, S}, _} = ra:leader_query(ServerId, fun ra_lib:id/1),
S == state_v5
end, 100),
?assertMatch({ok, #{effective_machine_version := 5}, _},
ra:member_overview(ServerId)),
{ok, _} = ra:delete_cluster([ServerId]),
await(fun () -> whereis(element(1, ServerId)) == undefined end, 100),
meck:expect(Mod, init, fun (#{machine_version := MacVer}) ->
?assertEqual(5, MacVer),
init_state
end),
meck:expect(Mod, apply, fun (Meta, meta, _S) ->
{state_v5, Meta};
(_, {machine_version, 0, 3}, init_state) ->
exit(booo),
{state_v3, ok};
(_, {machine_version, 5, 5}, init_state) ->
ct:pal("5-5"),
{state_v5, ok}
end),
Configs2 = [begin
UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
#{id => Id,
uid => UId,
cluster_name => ClusterName,
log_init_args => #{uid => UId},
initial_members => [ServerId],
machine => Machine,
initial_machine_version => 9}
end || Id <- [ServerId]],
{ok, _, _} = ra:start_cluster(?SYS, Configs2, 5000),
{ok, #{machine_version := 5}, _} = ra:process_command(ServerId, meta),
await(fun () ->
{ok, {_, S}, _} = ra:leader_query(ServerId, fun ra_lib:id/1),
ct:pal("S ~p", [S]),
S == state_v5
end, 100),
ct:pal("overview ~p", [ra:member_overview(ServerId)]),
ok.
%% Utility

validate_state_enters(States) ->
Expand Down

0 comments on commit 0559967

Please sign in to comment.