Skip to content

Commit

Permalink
Implement decode stream (#90)
Browse files Browse the repository at this point in the history
  • Loading branch information
williamthome authored Aug 6, 2024
1 parent b7c8bd5 commit 0988fb8
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 0 deletions.
1 change: 1 addition & 0 deletions elvis.config
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
allow => [
'assert',
'assertEqual',
'assertMatch',
'assertError',
'assertNotException'
]
Expand Down
43 changes: 43 additions & 0 deletions src/euneus.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
-export([decode/2]).
-export([decode_iodata/1]).
-export([decode_iodata/2]).
-export([decode_stream_start/1]).
-export([decode_stream_start/2]).
-export([decode_stream_continue/2]).
-export([minify/1]).
-export([format/2]).

Expand All @@ -25,6 +28,9 @@
-ignore_xref([decode/2]).
-ignore_xref([decode_iodata/1]).
-ignore_xref([decode_iodata/2]).
-ignore_xref([decode_stream_start/1]).
-ignore_xref([decode_stream_start/2]).
-ignore_xref([decode_stream_continue/2]).
-ignore_xref([minify/1]).
-ignore_xref([format/2]).

Expand Down Expand Up @@ -150,6 +156,43 @@ decode_iodata(JSON) ->
decode_iodata(JSON, Opts) ->
euneus_decoder:decode(iolist_to_binary(JSON), Opts).

-spec decode_stream_start(JSON) -> Result when
JSON :: binary(),
Result :: euneus_decoder:stream_result().
%% @equiv decode_stream_start(JSON, #{})
decode_stream_start(JSON) ->
decode_stream_start(JSON, #{}).

-spec decode_stream_start(JSON, Options) -> Result when
JSON :: binary(),
Options :: euneus_decoder:options(),
Result :: euneus_decoder:stream_result().
%% @equiv euneus_decoder:stream_start(JSON, Options)
%%
%% @doc Begin parsing a stream of bytes of a JSON value.
decode_stream_start(JSON, Opts) ->
euneus_decoder:stream_start(JSON, Opts).

-spec decode_stream_continue(JSON, State) -> Result when
JSON :: binary() | end_of_input,
State :: euneus_decoder:stream_state(),
Result :: euneus_decoder:stream_result().
%% @equiv euneus_decoder:stream_continue(JSON, State)
%%
%% @doc Continue parsing a stream of bytes of a JSON value.
%%
%% <em>Example:</em>
%%
%% ```
%% 1> begin
%% .. {continue, State} = euneus:decode_stream_start(<<"{\"foo\":">>),
%% .. euneus:decode_stream_continue(<<"1}">>, State)
%% .. end.
%% {end_of_input,#{<<"foo">> => 1}}
%% '''
decode_stream_continue(JSON, State) ->
euneus_decoder:stream_continue(JSON, State).

-spec minify(JSON) -> binary() when
JSON :: binary().
%% @doc Minifies a binary JSON.
Expand Down
52 changes: 52 additions & 0 deletions src/euneus_decoder.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
%% --------------------------------------------------------------------

-export([decode/2]).
-export([stream_start/2]).
-export([stream_continue/2]).

%% --------------------------------------------------------------------
%% Macros
Expand All @@ -29,6 +31,8 @@
-export_type([options/0]).
-export_type([codec/0]).
-export_type([codec_callback/0]).
-export_type([stream_state/0]).
-export_type([stream_result/0]).

%% --------------------------------------------------------------------
%% Types
Expand Down Expand Up @@ -73,6 +77,13 @@

-type codec_callback() :: fun((binary()) -> next | {halt, term()}).

% The correct type is 'json:continuation_state()', but dialyzer says it is wrong.
-type stream_state() :: term().

-type stream_result() ::
{continue, json:continuation_state()}
| {end_of_input, term()}.

%% --------------------------------------------------------------------
%% DocTest
%% --------------------------------------------------------------------
Expand Down Expand Up @@ -363,6 +374,42 @@ decode(JSON, Opts) when is_binary(JSON), is_map(Opts) ->
Decoders = decoders(Codecs, Opts),
decode(Codecs, JSON, Decoders).

-spec stream_start(JSON, Options) -> stream_result() when
JSON :: binary(),
Options :: options().
%% @doc Begin parsing a stream of bytes of a JSON value.
%%
%% Similar to `decode/2' but returns `{end_of_input, Term}' when a complete
%% JSON value is parsed or returns `{continue, State}' for incomplete data.
%%
%% The State can be fed to the `stream_continue/2' function when more data is available.
stream_start(JSON, Opts) ->
Codecs = maps:get(codecs, Opts, []),
Decoders = decoders(Codecs, Opts),
stream_result(json:decode_start(JSON, Codecs, Decoders)).

-spec stream_continue(JSON, State) -> stream_result() when
JSON :: binary() | end_of_input,
State :: stream_state().
%% @doc Continue parsing a stream of bytes of a JSON value.
%%
%% Similar to `stream_start/2', if the function returns `{continue, State}'
%% and there is no more data, use `end_of_input' instead of a binary.
%%
%% <em>Example:</em>
%%
%% ```
%% 1> begin
%% .. {continue, State} = euneus_decoder:stream_start(<<"{\"foo\":">>, #{}),
%% .. euneus_decoder:stream_continue(<<"1}">>, State)
%% .. end.
%% {end_of_input,#{<<"foo">> => 1}}
%% '''
stream_continue(<<>>, State) ->
stream_result(json:decode_continue(end_of_input, State));
stream_continue(JSON, State) ->
stream_result(json:decode_continue(JSON, State)).

%% --------------------------------------------------------------------
%% Internal functions
%% --------------------------------------------------------------------
Expand All @@ -375,6 +422,11 @@ decode(Codecs, JSON, Decoders) ->
invalid_byte(Rest, 0)
end.

stream_result({continue, State}) ->
{continue, State};
stream_result({Result, Codecs, <<>>}) ->
{end_of_input, traverse_codecs(Codecs, Result)}.

% This is a copy of json:invalid_byte/2, since it is not exported.
invalid_byte(Bin, Skip) ->
Byte = binary:at(Bin, Skip),
Expand Down
11 changes: 11 additions & 0 deletions test/euneus_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ decode_iodata_test(Config) when is_list(Config) ->
?assertEqual([<<"foo">>], euneus:decode_iodata([$[, [$", <<"foo">>, $"], $]], #{}))
].

decode_stream_test(Config) when is_list(Config) ->
State = stream_continue_state(euneus:decode_stream_start(<<"{\"foo\":">>)),
?assertEqual(
{end_of_input, #{<<"foo">> => 1}}, euneus:decode_stream_continue(<<"1}">>, State)
).

% Wrapper to suppress dialyzer errors.
% See notes in euneus_decoder_SUITE:stream_continue_state/1.
stream_continue_state({continue, State}) ->
State.

minify_test(Config) when is_list(Config) ->
?assertEqual(
<<"{\"foo\":\"bar\",\"0\":0,[null,true,false,0.001,\"foo\",{\"foo\":0.0001]}">>,
Expand Down
30 changes: 30 additions & 0 deletions test/euneus_decoder_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,36 @@ assert("n_" ++ _, JSON) ->
assert("i_" ++ _, _JSON) ->
?assert(true).

stream_start_test(Config) when is_list(Config) ->
[
?assertMatch({continue, _}, euneus_decoder:stream_start(<<"{\"foo\":">>, #{})),
?assertEqual({end_of_input, <<"foo">>}, euneus_decoder:stream_start(<<"\"foo\"">>, #{}))
].

stream_continue_test(Config) when is_list(Config) ->
State1 = stream_continue_state(euneus_decoder:stream_start(<<"{\"foo\":">>, #{})),
State2 = stream_continue_state(euneus_decoder:stream_start(<<"123">>, #{})),
[
?assertMatch({continue, _}, euneus_decoder:stream_continue(<<"1">>, State1)),
?assertEqual(
{end_of_input, #{<<"foo">> => 1}}, euneus_decoder:stream_continue(<<"1}">>, State1)
),
?assertError(unexpected_end, euneus_decoder:stream_continue(end_of_input, State1)),
?assertEqual({end_of_input, 123}, euneus_decoder:stream_continue(<<>>, State2))
].

% Wrapper to suppress dialyzer errors:
% ```
% test/euneus_decoder_SUITE.erl
% Line 60 Column 1: Function stream_continue_test/1 has no local return
% Line 64 Column 10: The created fun has no local return
% Line 64 Column 77: The call euneus_decoder:stream_continue(<<49>>,
% State1::json:continuation_state()) contains an opaque term as 2nd argument
% when terms of different types are expected in these positions
% '''
stream_continue_state({continue, State}) ->
State.

codecs_test(Config) when is_list(Config) ->
[
?assertEqual([], decode(<<"[]">>, #{codecs => []})),
Expand Down

0 comments on commit 0988fb8

Please sign in to comment.