Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New log effect type: log_ext that allows entries residing in segment files to be read by another process. #477

Merged
merged 2 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -440,14 +440,17 @@ logger:set_primary_config(level, debug).
Ra attempts to follow [Semantic Versioning](https://semver.org/).

The modules that form part of the public API are:

* `ra`
* `ra_machine` (behaviour callbacks only)
* `ra_aux`
* `ra_system`
* `ra_counters`
* `ra_counters` (counter keys may vary between minors)
* `ra_leaderboard`
* `ra_env`
* `ra_directory`
* `ra_flru`
* `ra_log_read_plan`

## Copyright and License

Expand Down
17 changes: 17 additions & 0 deletions docs/internals/STATE_MACHINE_TUTORIAL.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,23 @@ Only the leader that first applies an entry will attempt the effect.
Followers process the same set of commands but simply throw away any effects
returned by the state machine unless specific effect provide the `local` option.

### Machine Effects table

| Spec | Executed on |
| -----| ----------- |
| `{send_msg, pid(), Msg :: term()}` | leader |
| `{send_msg, pid(), Msg :: term(), [local]}` | on member local to `pid()` else leader |
| `{monitor \| demonitor, process \| node, pid() \| node()}` | leader |
| `{mod_call, mfa()}` | leader |
| `{timer, Name :: term(), Time :: non_neg_integer() \| infinity}` | leader |
| `{append, term()}` | leader |
| `{append, term(), ra_server:command_reply_mode()}` | leader |
| `{log, [ra_index()], fun(([user_command()]) -> effects())}` | leader |
| `{log, [ra_index()], fun(([user_command()]) -> effects()), {local, node()}}` | on member local to `node()` else leader |
| `{log_ext, [ra_index()], fun(([ra_log:read_plan()]) -> effects()), {local, node()}}` | on member local to `node()` else leader |
| `{release_cursor \| checkpoint, ra_index(), term()}` | all members |
| `{aux, term()}` | every member |


### Send a message

Expand Down
1 change: 0 additions & 1 deletion src/ra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1262,7 +1262,6 @@ key_metrics({Name, N} = ServerId, _Timeout) when N == node() ->
key_metrics({_, N} = ServerId, Timeout) ->
erpc:call(N, ?MODULE, ?FUNCTION_NAME, [ServerId], Timeout).


%% internal

-spec usr(UserCommand, ReplyMode) -> Command when
Expand Down
7 changes: 6 additions & 1 deletion src/ra_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@
is_dir/1,
is_file/1,
ensure_dir/1,
consult/1
consult/1,
cons/2
]).

-type file_err() :: file:posix() | badarg | terminated | system_limit.
Expand Down Expand Up @@ -454,6 +455,10 @@ consult(Path) ->
Err
end.

cons(Item, List)
when is_list(List) ->
[Item | List].

tokens(Str) ->
case erl_scan:string(Str) of
{ok, Tokens, _EndLoc} ->
Expand Down
104 changes: 87 additions & 17 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
write_sync/2,
fold/5,
sparse_read/2,
partial_read/3,
execute_read_plan/3,
read_plan_info/1,
last_index_term/1,
set_last_index/2,
handle_event/2,
Expand Down Expand Up @@ -70,9 +73,10 @@
{down, pid(), term()}.

-type event() :: {ra_log_event, event_body()}.
-type transform_fun() :: fun ((ra_index(), ra_term(), ra_server:command()) -> term()).

-type effect() ::
{delete_snapshot, Dir :: file:filename(), ra_idxterm()} |
{delete_snapshot, Dir :: file:filename_all(), ra_idxterm()} |
{monitor, process, log, pid()} |
ra_snapshot:effect() |
ra_server:effect().
Expand All @@ -84,7 +88,7 @@

-record(cfg, {uid :: ra_uid(),
log_id :: unicode:chardata(),
directory :: file:filename(),
directory :: file:filename_all(),
min_snapshot_interval = ?MIN_SNAPSHOT_INTERVAL :: non_neg_integer(),
min_checkpoint_interval = ?MIN_CHECKPOINT_INTERVAL :: non_neg_integer(),
snapshot_module :: module(),
Expand All @@ -110,7 +114,11 @@
tx = false :: boolean()
}).

-record(read_plan, {dir :: file:filename_all(),
the-mikedavis marked this conversation as resolved.
Show resolved Hide resolved
read :: #{ra_index() := log_entry()},
plan :: ra_log_reader:read_plan()}).

-opaque read_plan() :: #read_plan{}.
-opaque state() :: #?MODULE{}.

-type ra_log_init_args() :: #{uid := ra_uid(),
Expand Down Expand Up @@ -145,6 +153,7 @@
atom() => term()}.

-export_type([state/0,
read_plan/0,
ra_log_init_args/0,
ra_meta_key/0,
segment_ref/0,
Expand All @@ -154,13 +163,16 @@
overview/0
]).

-define(SNAPSHOTS_DIR, <<"snapshots">>).
-define(CHECKPOINTS_DIR, <<"checkpoints">>).

pre_init(#{uid := UId,
system_config := #{data_dir := DataDir}} = Conf) ->
Dir = server_data_dir(DataDir, UId),
SnapModule = maps:get(snapshot_module, Conf, ?DEFAULT_SNAPSHOT_MODULE),
MaxCheckpoints = maps:get(max_checkpoints, Conf, ?DEFAULT_MAX_CHECKPOINTS),
SnapshotsDir = filename:join(Dir, "snapshots"),
CheckpointsDir = filename:join(Dir, "checkpoints"),
SnapshotsDir = filename:join(Dir, ?SNAPSHOTS_DIR),
CheckpointsDir = filename:join(Dir, ?CHECKPOINTS_DIR),
_ = ra_snapshot:init(UId, SnapModule, SnapshotsDir,
CheckpointsDir, undefined, MaxCheckpoints),
ok.
Expand All @@ -183,8 +195,8 @@ init(#{uid := UId,
CPInterval = maps:get(min_checkpoint_interval, Conf,
?MIN_CHECKPOINT_INTERVAL),
MaxCheckpoints = maps:get(max_checkpoints, Conf, ?DEFAULT_MAX_CHECKPOINTS),
SnapshotsDir = filename:join(Dir, "snapshots"),
CheckpointsDir = filename:join(Dir, "checkpoints"),
SnapshotsDir = filename:join(Dir, ?SNAPSHOTS_DIR),
CheckpointsDir = filename:join(Dir, ?CHECKPOINTS_DIR),
Counter = maps:get(counter, Conf, undefined),

%% ensure directories are there
Expand Down Expand Up @@ -303,7 +315,6 @@ init(#{uid := UId,
{SnapIdx, SnapTerm},
State#?MODULE.last_written_index_term
]),
?DEBUG("~ts: ra_log:init overview ~p", [overview(State)]),
element(1, delete_segments(SnapIdx, State)).

-spec close(state()) -> ok.
Expand Down Expand Up @@ -465,8 +476,9 @@ fold(From0, To0, Fun, Acc0,
fold(_From, _To, _Fun, Acc, State) ->
{Acc, State}.

%% read a list of indexes,
%% found indexes be returned in the same order as the input list of indexes
%% @doc Reads a list of indexes.
%% Found indexes are returned in the same order as the input list of indexes
%% @end
-spec sparse_read([ra_index()], state()) ->
{[log_entry()], state()}.
sparse_read(Indexes0, #?MODULE{cfg = Cfg,
Expand All @@ -488,8 +500,8 @@ sparse_read(Indexes0, #?MODULE{cfg = Cfg,

%% drop any indexes that are larger than the last index available
Indexes2 = lists:dropwhile(fun (I) -> I > LastIdx end, Indexes1),
{Entries0, CacheNumRead, Indexes} = ra_mt:get_items(Indexes2, Mt),
ok = incr_counter(Cfg, ?C_RA_LOG_READ_MEM_TBL, CacheNumRead),
{Entries0, MemTblNumRead, Indexes} = ra_mt:get_items(Indexes2, Mt),
ok = incr_counter(Cfg, ?C_RA_LOG_READ_MEM_TBL, MemTblNumRead),
{Entries1, Reader} = ra_log_reader:sparse_read(Reader0, Indexes, Entries0),
%% here we recover the original order of indexes
Entries = case Sort of
Expand All @@ -507,6 +519,65 @@ sparse_read(Indexes0, #?MODULE{cfg = Cfg,
end,
{Entries, State#?MODULE{reader = Reader}}.


%% read a list of indexes,
%% found indexes be returned in the same order as the input list of indexes
kjnilsson marked this conversation as resolved.
Show resolved Hide resolved
-spec partial_read([ra_index()], state(),
fun ((ra_index(), ra_term(), ra_server:command()) -> term())
) ->
read_plan().
partial_read(Indexes0, #?MODULE{cfg = Cfg,
reader = Reader0,
last_index = LastIdx,
mem_table = Mt},
TransformFun) ->
ok = incr_counter(Cfg, ?C_RA_LOG_READ_OPS, 1),
%% indexes need to be sorted high -> low for correct and efficient reading
Sort = ra_lib:lists_detect_sort(Indexes0),
Indexes1 = case Sort of
unsorted ->
lists:sort(fun erlang:'>'/2, Indexes0);
ascending ->
lists:reverse(Indexes0);
_ ->
% descending or undefined
Indexes0
end,

%% drop any indexes that are larger than the last index available
Indexes2 = lists:dropwhile(fun (I) -> I > LastIdx end, Indexes1),
{Entries0, MemTblNumRead, Indexes} = ra_mt:get_items(Indexes2, Mt),
ok = incr_counter(Cfg, ?C_RA_LOG_READ_MEM_TBL, MemTblNumRead),
Read = lists:foldl(fun ({I, T, Cmd}, Acc) ->
maps:put(I, TransformFun(I, T, Cmd), Acc)
end, #{}, Entries0),

Plan = ra_log_reader:read_plan(Reader0, Indexes),
#read_plan{dir = Cfg#cfg.directory,
read = Read,
plan = Plan}.


-spec execute_read_plan(read_plan(), undefined | ra_flru:state(),
TransformFun :: transform_fun()) ->
{#{ra_index() => Command :: term()}, ra_flru:state()}.
execute_read_plan(#read_plan{dir = Dir,
read = Read,
plan = Plan}, Flru0, TransformFun) ->
ra_log_reader:exec_read_plan(Dir, Plan, Flru0, TransformFun, Read).

-spec read_plan_info(read_plan()) -> map().
read_plan_info(#read_plan{read = Read,
plan = Plan}) ->
NumSegments = length(Plan),
NumInSegments = lists:foldl(fun ({_, Idxs}, Acc) ->
Acc + length(Idxs)
end, 0, Plan),
#{num_read => map_size(Read),
num_in_segments => NumInSegments,
num_segments => NumSegments}.


-spec last_index_term(state()) -> ra_idxterm().
last_index_term(#?MODULE{last_index = LastIdx, last_term = LastTerm}) ->
{LastIdx, LastTerm}.
Expand Down Expand Up @@ -980,8 +1051,8 @@ overview(#?MODULE{last_index = LastIndex,

-spec write_config(ra_server:config(), state()) -> ok.
write_config(Config0, #?MODULE{cfg = #cfg{directory = Dir}}) ->
ConfigPath = filename:join(Dir, "config"),
TmpConfigPath = filename:join(Dir, "config.tmp"),
ConfigPath = filename:join(Dir, <<"config">>),
TmpConfigPath = filename:join(Dir, <<"config.tmp">>),
% clean config of potentially unserialisable data
Config = maps:without([parent,
counter,
Expand All @@ -994,12 +1065,12 @@ write_config(Config0, #?MODULE{cfg = #cfg{directory = Dir}}) ->
ok = prim_file:rename(TmpConfigPath, ConfigPath),
ok.

-spec read_config(state() | file:filename()) ->
-spec read_config(state() | file:filename_all()) ->
{ok, ra_server:config()} | {error, term()}.
read_config(#?MODULE{cfg = #cfg{directory = Dir}}) ->
read_config(Dir);
read_config(Dir) ->
ConfigPath = filename:join(Dir, "config"),
ConfigPath = filename:join(Dir, <<"config">>),
ra_lib:consult(ConfigPath).

-spec delete_everything(state()) -> ok.
Expand Down Expand Up @@ -1309,8 +1380,7 @@ put_counter(#cfg{counter = undefined}, _Ix, _N) ->
ok.

server_data_dir(Dir, UId) ->
Me = ra_lib:to_list(UId),
filename:join(Dir, Me).
filename:join(Dir, UId).

maps_with_values(Keys, Map) ->
lists:foldr(
Expand Down
20 changes: 20 additions & 0 deletions src/ra_log_read_plan.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries.
%%
-module(ra_log_read_plan).


-export([execute/2,
info/1]).

-spec execute(ra_log:read_plan(), undefined | ra_flru:state()) ->
{#{ra:index() => Command :: ra_server:command()}, ra_flru:state()}.
execute(Plan, Flru) ->
ra_log:execute_read_plan(Plan, Flru, fun ra_server:transform_for_partial_read/3).

-spec info(ra_log:read_plan()) -> map().
info(Plan) ->
ra_log:read_plan_info(Plan).
Loading
Loading