diff --git a/src/katipo.erl b/src/katipo.erl index 0681e1c..7e570c5 100644 --- a/src/katipo.erl +++ b/src/katipo.erl @@ -316,9 +316,18 @@ sslcert = undefined :: undefined | binary() | file:name_all(), sslkey = undefined :: undefined | binary() | file:name_all(), sslkey_blob = undefined :: undefined | binary(), - keypasswd = undefined :: undefined | binary() + keypasswd = undefined :: undefined | binary(), + async = false :: boolean(), + reply_to = undefined :: pid() | atom() }). +-record(reply_to, { + async = false :: boolean(), + from :: {pid(), any()} | pid(), + tref = undefined :: undefined | reference(), + response_ref = undefined :: undefined | reference() + }). + tcp_fastopen_available() -> ?TCP_FASTOPEN_AVAILABLE. @@ -395,12 +404,16 @@ req(PoolName, Opts) case process_opts(Opts) of {ok, #req{url=undefined}} -> {error, error_map(bad_opts, <<"[{url,undefined}]">>)}; - {ok, Req} -> + {ok, Req=#req{async=Async, reply_to=ReplyTo}} -> Timeout = ?MODULE:get_timeout(Req), Req2 = Req#req{timeout=Timeout}, + Req3 = case {Async, ReplyTo} of + {true, undefined} -> Req2#req{reply_to=self()}; + {_, _} -> Req2 + end, Ts = os:timestamp(), {Result, {Response, Metrics}} = - wpool:call(PoolName, Req2, random_worker, infinity), + wpool:call(PoolName, Req3, random_worker, infinity), TotalUs = timer:now_diff(os:timestamp(), Ts), Metrics2 = katipo_metrics:notify({Result, Response}, Metrics, TotalUs), Response2 = maybe_return_metrics(Req2, Metrics2, Response), @@ -455,7 +468,9 @@ handle_call(#req{method = Method, sslcert = SSLCert, sslkey = SSLKey, sslkey_blob = SSLKeyBlob, - keypasswd = KeyPasswd}, + keypasswd = KeyPasswd, + async = Async, + reply_to = PidOrAtom}, From, State=#state{port=Port, reqs=Reqs}) -> {Self, Ref} = From, @@ -484,9 +499,22 @@ handle_call(#req{method = Method, {?keypasswd, KeyPasswd}], Command = {Self, Ref, Method, Url, Headers, CookieJar, Body, Opts}, true = port_command(Port, term_to_binary(Command)), - Tref = erlang:start_timer(Timeout, self(), {req_timeout, From}), - Reqs2 = maps:put(From, Tref, Reqs), - {noreply, State#state{reqs=Reqs2}}. + ReplyTo = + case Async of + false -> + Tref = erlang:start_timer(Timeout, self(), {req_timeout, From}), + #reply_to{async=Async, from=From, tref=Tref}; + true -> + #reply_to{async=Async, from=PidOrAtom, response_ref=make_ref()} + end, + Reqs2 = maps:put(From, ReplyTo, Reqs), + State2 = State#state{reqs=Reqs2}, + case Async of + false -> + {noreply, State2}; + true -> + {reply, {ok, ReplyTo#reply_to.response_ref}, State2} + end. handle_cast(Msg, State) -> error_logger:error_msg("Unexpected cast: ~p", [Msg]), @@ -506,9 +534,12 @@ handle_info({Port, {data, Data}}, State=#state{port=Port, reqs=Reqs}) -> {error, {From0, {Error, Metrics}}} end, case maps:find(From, Reqs) of - {ok, Tref} -> + {ok, #reply_to{async=false, from=From, tref=Tref}} -> _ = erlang:cancel_timer(Tref), _ = gen_server:reply(From, {Result, Response}); + {ok, #reply_to{async=true, from=PidOrAtom, response_ref=ResponseRef}} -> + PidOrAtom ! {katipo_response, ResponseRef, {Result, Response}}, + ok; error -> ok end, @@ -704,6 +735,10 @@ opt(sslkey_blob, Key, {Req, Errors}) {Req#req{sslkey_blob=Key}, Errors}; opt(keypasswd, Pass, {Req, Errors}) when is_binary(Pass) -> {Req#req{keypasswd=Pass}, Errors}; +opt(async, Async, {Req, Errors}) when is_boolean(Async) -> + {Req#req{async=Async}, Errors}; +opt(reply_to, PidOrAtom, {Req, Errors}) when is_pid(PidOrAtom) orelse is_atom(PidOrAtom) -> + {Req#req{reply_to=PidOrAtom}, Errors}; opt(K, V, {Req, Errors}) -> {Req, [{K, V} | Errors]}.