Skip to content

Commit

Permalink
Detect when a segment has been modified.
Browse files Browse the repository at this point in the history
When executing a read plan it is possible that the read plan
refers to indexes not in the index of an segment that is still
being written to. This commit handles that change.
  • Loading branch information
kjnilsson committed Dec 17, 2024
1 parent 9fbb2e0 commit ed27d3c
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 39 deletions.
15 changes: 11 additions & 4 deletions src/ra_log_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,16 @@ exec_read_plan(Dir, Plan, Open0, TransformFun, Acc0)
end,
lists:foldl(
fun ({Idxs, BaseName}, {Acc1, Open1}) ->
{Seg, Open} = get_segment_ext(Dir, Open1, BaseName),
{_, Acc} = ra_log_segment:read_sparse(Seg, Idxs, Fun, Acc1),
{Acc, Open}
{Seg, Open2} = get_segment_ext(Dir, Open1, BaseName),
case ra_log_segment:read_sparse(Seg, Idxs, Fun, Acc1) of
{ok, _, Acc} ->
{Acc, Open2};
{error, modified} ->
{_, Open3} = ra_flru:evict(BaseName, Open2),
{SegNew, Open} = get_segment_ext(Dir, Open3, BaseName),
{ok, _, Acc} = ra_log_segment:read_sparse(SegNew, Idxs, Fun, Acc1),
{Acc, Open}
end
end, {Acc0, Open0}, Plan).

-spec fetch_term(ra_index(), state()) -> {option(ra_index()), state()}.
Expand Down Expand Up @@ -335,7 +342,7 @@ segment_sparse_read(#?STATE{segment_refs = SegRefs,
lists:foldl(
fun ({Idxs, Fn}, {Open0, C, En0}) ->
{Seg, Open} = get_segment(Cfg, Open0, Fn),
{ReadSparseCount, Entries} =
{ok, ReadSparseCount, Entries} =
ra_log_segment:read_sparse(Seg, Idxs,
fun (I, T, B, Acc) ->
[{I, T, binary_to_term(B)} | Acc]
Expand Down
74 changes: 50 additions & 24 deletions src/ra_log_segment.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
append/4,
sync/1,
fold/6,
is_modified/1,
read_sparse/4,
term_query/2,
close/1,
Expand All @@ -27,6 +28,8 @@

-include("ra.hrl").

-include_lib("kernel/include/file.hrl").

-define(VERSION, 2).
-define(MAGIC, "RASG").
-define(HEADER_SIZE, 4 + (16 div 8) + (16 div 8)).
Expand Down Expand Up @@ -112,6 +115,7 @@ open(Filename, Options) ->
end.

process_file(true, Mode, Filename, Fd, Options) ->
AccessPattern = maps:get(access_pattern, Options, random),
case read_header(Fd) of
{ok, Version, MaxCount} ->
MaxPending = maps:get(max_pending, Options, ?SEGMENT_MAX_PENDING),
Expand All @@ -120,7 +124,6 @@ process_file(true, Mode, Filename, Fd, Options) ->
{NumIndexRecords, DataOffset, Range, Index} =
recover_index(Fd, Version, MaxCount),
IndexOffset = ?HEADER_SIZE + NumIndexRecords * IndexRecordSize,
AccessPattern = maps:get(access_pattern, Options, random),
Mode = maps:get(mode, Options, append),
ComputeChecksums = maps:get(compute_checksums, Options, true),
{ok, #state{cfg = #cfg{version = Version,
Expand Down Expand Up @@ -184,16 +187,15 @@ append(#state{cfg = #cfg{max_pending = PendingCount},
append(#state{cfg = #cfg{version = Version,
mode = append} = Cfg,
index_offset = IndexOffset,
data_start = DataStart,
data_offset = DataOffset,
range = Range0,
pending_count = PendCnt,
pending_index = IdxPend0,
pending_data = DataPend0} = State,
Index, Term, {Length, Data}) ->
% check if file is full
case IndexOffset < DataStart of
true ->

case is_full(State) of
false ->
% TODO: check length is less than #FFFFFFFF ??
Checksum = compute_checksum(Cfg, Data),
OSize = offset_size(Version),
Expand All @@ -209,7 +211,7 @@ append(#state{cfg = #cfg{version = Version,
pending_data = [DataPend0, Data],
pending_count = PendCnt + 1}
};
false ->
true ->
{error, full}
end;
append(State, Index, Term, Data)
Expand Down Expand Up @@ -271,38 +273,58 @@ fold(#state{cfg = #cfg{mode = read} = Cfg,
FromIdx, ToIdx, Fun, AccFun, Acc) ->
fold0(Cfg, Cache, FromIdx, ToIdx, Index, Fun, AccFun, Acc).

-spec is_modified(state()) -> boolean().
is_modified(#state{cfg = #cfg{fd = Fd},
data_offset = DataOffset} = State) ->
case is_full(State) of
true ->
%% a full segment cannot be appended to.
false;
false ->
%% get info and compare to data_offset
{ok, #file_info{size = Size}} = prim_file:read_handle_info(Fd),
Size > DataOffset
end.

-spec read_sparse(state(), [ra_index()],
fun((ra:index(), ra_term(), binary(), Acc) -> Acc),
Acc) ->
{NumRead :: non_neg_integer(), Acc}
{ok, NumRead :: non_neg_integer(), Acc} | {error, modified}
when Acc :: term().
read_sparse(#state{index = Index,
cfg = Cfg}, Indexes, AccFun, Acc) ->
Cache0 = prepare_cache(Cfg, Indexes, Index),
read_sparse0(Cfg, Indexes, Index, Cache0, Acc, AccFun, 0).
cfg = #cfg{fd = Fd}} = State,
Indexes, AccFun, Acc) ->
case is_modified(State) of
true ->
{error, modified};
false ->
Cache0 = prepare_cache(Fd, Indexes, Index),
read_sparse0(Fd, Indexes, Index, Cache0, Acc, AccFun, 0)
end.

read_sparse0(_Cfg, [], _Index, _Cache, Acc, _AccFun, Num) ->
{Num, Acc};
read_sparse0(Cfg, [NextIdx | Rem] = Indexes, Index, Cache0, Acc, AccFun, Num)
read_sparse0(_Fd, [], _Index, _Cache, Acc, _AccFun, Num) ->
{ok, Num, Acc};
read_sparse0(Fd, [NextIdx | Rem] = Indexes, Index, Cache0, Acc, AccFun, Num)
when is_map_key(NextIdx, Index) ->
{Term, Offset, Length, _} = map_get(NextIdx, Index),
case cache_read(Cache0, Offset, Length) of
{Term, Pos, Length, _} = map_get(NextIdx, Index),
case cache_read(Cache0, Pos, Length) of
false ->
case prepare_cache(Cfg, Indexes, Index) of
case prepare_cache(Fd, Indexes, Index) of
undefined ->
{ok, Data, _} = pread(Cfg, undefined, Offset, Length),
read_sparse0(Cfg, Rem, Index, undefined,
%% TODO: check for partial data?
{ok, Data} = file:pread(Fd, Pos, Length),
read_sparse0(Fd, Rem, Index, undefined,
AccFun(NextIdx, Term, Data, Acc),
AccFun, Num+1);
Cache ->
read_sparse0(Cfg, Indexes, Index, Cache,
Acc, AccFun, Num+1)
read_sparse0(Fd, Indexes, Index, Cache,
Acc, AccFun, Num)
end;
Data ->
read_sparse0(Cfg, Rem, Index, Cache0,
read_sparse0(Fd, Rem, Index, Cache0,
AccFun(NextIdx, Term, Data, Acc), AccFun, Num+1)
end;
read_sparse0(_Cfg, [NextIdx | _], _Index, _Cache, _Acc, _AccFun, _Num) ->
read_sparse0(_Fd, [NextIdx | _], _Index, _Cache, _Acc, _AccFun, _Num) ->
exit({missing_key, NextIdx}).

cache_read({CPos, CLen, Bin}, Pos, Length)
Expand All @@ -313,9 +335,9 @@ cache_read({CPos, CLen, Bin}, Pos, Length)
cache_read(_, _, _) ->
false.

prepare_cache(#cfg{} = _Cfg, [_], _SegIndex) ->
prepare_cache(_Fd, [_], _SegIndex) ->
undefined;
prepare_cache(#cfg{fd = Fd} = _Cfg, [FirstIdx | Rem], SegIndex) ->
prepare_cache(Fd, [FirstIdx | Rem], SegIndex) ->
case consec_run(FirstIdx, FirstIdx, Rem) of
{Idx, Idx} ->
%% no run, no cache;
Expand Down Expand Up @@ -622,6 +644,10 @@ validate_checksum(0, _) ->
validate_checksum(Crc, Data) ->
Crc == erlang:crc32(Data).

is_full(#state{index_offset = IndexOffset,
data_start = DataStart}) ->
IndexOffset >= DataStart.

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

Expand Down
2 changes: 1 addition & 1 deletion src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1728,7 +1728,7 @@ machine_query(QueryFun, #{cfg := #cfg{effective_machine_module = MacMod},
become(leader, OldRaftState, #{cluster := Cluster,
cluster_change_permitted := CCP0,
log := Log0} = State) ->
Log = ra_log:release_resources(maps:size(Cluster) + 2, random, Log0),
Log = ra_log:release_resources(maps:size(Cluster), sequential, Log0),
CCP = case OldRaftState of
await_condition ->
CCP0;
Expand Down
16 changes: 16 additions & 0 deletions test/ra_log_2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ all_tests() ->
transient_writer_is_handled,
read_opt,
sparse_read,
read_plan_modified,
read_plan,
sparse_read_out_of_range,
sparse_read_out_of_range_2,
Expand Down Expand Up @@ -481,6 +482,21 @@ sparse_read(Config) ->
{99, _, _}], _LogO3} = ra_log:sparse_read([1000,5,99], LogO2),
ok.

read_plan_modified(Config) ->
Log0 = ra_log_init(Config),
Log1 = write_and_roll(1, 2, 1, Log0, 50),
Log2 = deliver_all_log_events(Log1, 100),
Plan = ra_log:partial_read([1], Log2, fun (_, _, Cmd) -> Cmd end),
{#{1 := _}, Flru} = ra_log_read_plan:execute(Plan, undefined),

Log = deliver_all_log_events(write_and_roll(2, 3, 1, Log2, 50), 100),
Plan2 = ra_log:partial_read([1,2], Log, fun (_, _, Cmd) -> Cmd end),
%% assert we can read the newly appended item with the cached
%% segment
{#{1 := _, 2 := _}, _} = ra_log_read_plan:execute(Plan2, Flru),
ra_log:close(Log),
ok.

read_plan(Config) ->
Num = 256 * 2,
Div = 2,
Expand Down
49 changes: 43 additions & 6 deletions test/ra_log_segment_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ all_tests() ->
overwrite,
term_query,
write_many,
read_sparse_append_read,
open_invalid,
corrupted_segment,
large_segment,
Expand Down Expand Up @@ -81,6 +82,7 @@ corrupted_segment(Config) ->
%% ct:pal("DUMP PRE ~p", [ra_log_segment:dump_index(Fn)]),
%% check that the current state throws a missing key
{ok, SegR0} = ra_log_segment:open(Fn, #{mode => read}),
?assertNot(ra_log_segment:is_modified(SegR0)),
?assertExit({missing_key, 2},
read_sparse(SegR0, [1, 2])),

Expand Down Expand Up @@ -210,11 +212,13 @@ segref(Config) ->
full_file(Config) ->
Dir = ?config(data_dir, Config),
Fn = filename:join(Dir, "seg1.seg"),
Data = make_data(1024),
{ok, Seg0} = ra_log_segment:open(Fn, #{max_count => 2}),
Data = make_data(10),
{ok, Seg0} = ra_log_segment:open(Fn, #{max_count => 2,
max_pending => 1}),
{ok, Seg1} = ra_log_segment:append(Seg0, 1, 2, Data),
{ok, Seg} = ra_log_segment:append(Seg1, 2, 2, Data),
{error, full} = ra_log_segment:append(Seg, 3, 2, Data),
?assertNot(ra_log_segment:is_modified(Seg)),
{1,2} = ra_log_segment:range(Seg),
ok = ra_log_segment:close(Seg),
ok.
Expand Down Expand Up @@ -396,6 +400,39 @@ write_many(Config) ->
ct:pal("~p", [Result]),
ok.


read_sparse_append_read(Config) ->
Dir = ?config(data_dir, Config),
Fn = filename:join(Dir, <<"0000000.segment">>),
{ok, W0} = ra_log_segment:open(Fn, #{}),
Data = <<"banana">>,
Term = 1,
%% write two entries in term 1
{ok, W1} = ra_log_segment:append(W0, 1, Term, Data),
{ok, W2} = ra_log_segment:append(W1, 2, Term, Data),
{ok, W3} = ra_log_segment:flush(W2),


{ok, R0} = ra_log_segment:open(Fn, #{mode => read}),
{ok, 2, [_, _]} = ra_log_segment:read_sparse(R0, [1, 2],
fun (I, _, _, Acc) ->
[I | Acc]
end, []),

?assertNot(ra_log_segment:is_modified(R0)),
%% overwrite in term 2
{ok, W4} = ra_log_segment:append(W3, 2, 2, <<"apple">>),
{ok, W5} = ra_log_segment:append(W4, 3, 2, <<"apple">>),
{ok, W} = ra_log_segment:flush(W5),
?assert(ra_log_segment:is_modified(R0)),
{error, modified} = ra_log_segment:read_sparse(R0, [2],
fun (_I, _, B, Acc) ->
[B | Acc]
end, []),
ra_log_segment:close(W),
ra_log_segment:close(R0),
ok.

write_until_full(Idx, Term, Data, Seg0) ->
case ra_log_segment:append(Seg0, Idx, Term, Data) of
{ok, Seg} ->
Expand All @@ -410,8 +447,8 @@ make_data(Size) ->
term_to_binary(crypto:strong_rand_bytes(Size)).

read_sparse(R, Idxs) ->
{_, Entries} = ra_log_segment:read_sparse(R, Idxs,
fun (I, T, B, Acc) ->
[{I, T, B} | Acc]
end, []),
{ok, _, Entries} = ra_log_segment:read_sparse(R, Idxs,
fun (I, T, B, Acc) ->
[{I, T, B} | Acc]
end, []),
lists:reverse(Entries).
8 changes: 4 additions & 4 deletions test/ra_log_segment_writer_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -800,10 +800,10 @@ segments_for(UId, DataDir) ->
SegFiles.

read_sparse(R, Idxs) ->
{_, Entries} = ra_log_segment:read_sparse(R, Idxs,
fun(I, T, B, Acc) ->
[{I, T, B} | Acc]
end, []),
{ok, _, Entries} = ra_log_segment:read_sparse(R, Idxs,
fun(I, T, B, Acc) ->
[{I, T, B} | Acc]
end, []),
lists:reverse(Entries).

get_names(System) when is_atom(System) ->
Expand Down

0 comments on commit ed27d3c

Please sign in to comment.