diff --git a/src/gun.erl b/src/gun.erl index db6965bb..efa02417 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -199,7 +199,8 @@ -type req_opts() :: #{ flow => pos_integer(), reply_to => pid(), - tunnel => stream_ref() + tunnel => stream_ref(), + timeout => timeout() }. -export_type([req_opts/0]). @@ -634,8 +635,9 @@ headers(ServerPid, Method, Path, Headers0, ReqOpts) -> StreamRef = make_stream_ref(Tunnel), InitialFlow = maps:get(flow, ReqOpts, infinity), ReplyTo = maps:get(reply_to, ReqOpts, self()), + Timeout = maps:get(timeout, ReqOpts, infinity), gen_statem:cast(ServerPid, {headers, ReplyTo, StreamRef, - Method, Path, normalize_headers(Headers0), InitialFlow}), + Method, Path, normalize_headers(Headers0), InitialFlow, Timeout}), StreamRef. -spec request(pid(), iodata(), iodata(), req_headers(), iodata()) -> stream_ref(). @@ -648,8 +650,9 @@ request(ServerPid, Method, Path, Headers, Body, ReqOpts) -> StreamRef = make_stream_ref(Tunnel), InitialFlow = maps:get(flow, ReqOpts, infinity), ReplyTo = maps:get(reply_to, ReqOpts, self()), + Timeout = maps:get(timeout, ReqOpts, infinity), gen_statem:cast(ServerPid, {request, ReplyTo, StreamRef, - Method, Path, normalize_headers(Headers), Body, InitialFlow}), + Method, Path, normalize_headers(Headers), Body, InitialFlow, Timeout}), StreamRef. get_tunnel(#{tunnel := Tunnel}) when is_reference(Tunnel) -> @@ -700,8 +703,9 @@ connect(ServerPid, Destination, Headers, ReqOpts) -> StreamRef = make_stream_ref(Tunnel), InitialFlow = maps:get(flow, ReqOpts, infinity), ReplyTo = maps:get(reply_to, ReqOpts, self()), + Timeout = maps:get(timeout, ReqOpts, infinity), gen_statem:cast(ServerPid, {connect, ReplyTo, StreamRef, - Destination, Headers, InitialFlow}), + Destination, Headers, InitialFlow, Timeout}), StreamRef. %% Awaiting gun messages. @@ -1295,34 +1299,34 @@ connected_ws_only(Type, Event, State) -> %% %% @todo It might be better, internally, to pass around a URIMap %% containing the target URI, instead of separate Host/Port/PathWithQs. -connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers, InitialFlow}, +connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers, InitialFlow, Timeout}, State=#state{origin_host=Host, origin_port=Port, protocol=Protocol, protocol_state=ProtoState, cookie_store=CookieStore0, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> {Commands, CookieStore, EvHandlerState} = Protocol:headers(ProtoState, dereference_stream_ref(StreamRef, State), ReplyTo, Method, Host, Port, Path, Headers, - InitialFlow, CookieStore0, EvHandler, EvHandlerState0), + InitialFlow, CookieStore0, EvHandler, EvHandlerState0, Timeout), commands(Commands, State#state{cookie_store=CookieStore, event_handler_state=EvHandlerState}); -connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body, InitialFlow}, +connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body, InitialFlow, Timeout}, State=#state{origin_host=Host, origin_port=Port, protocol=Protocol, protocol_state=ProtoState, cookie_store=CookieStore0, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> {Commands, CookieStore, EvHandlerState} = Protocol:request(ProtoState, dereference_stream_ref(StreamRef, State), ReplyTo, Method, Host, Port, Path, Headers, Body, - InitialFlow, CookieStore0, EvHandler, EvHandlerState0), + InitialFlow, CookieStore0, EvHandler, EvHandlerState0, Timeout), commands(Commands, State#state{cookie_store=CookieStore, event_handler_state=EvHandlerState}); -connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow}, +connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow, Timeout}, State=#state{origin_host=Host, origin_port=Port, protocol=Protocol, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> {Commands, EvHandlerState} = Protocol:connect(ProtoState, dereference_stream_ref(StreamRef, State), ReplyTo, Destination, #{host => Host, port => Port}, - Headers, InitialFlow, EvHandler, EvHandlerState0), + Headers, InitialFlow, EvHandler, EvHandlerState0, Timeout), commands(Commands, State#state{event_handler_state=EvHandlerState}); %% Public Websocket interface. connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers}, State=#state{opts=Opts}) -> @@ -1387,11 +1391,11 @@ closing(state_timeout, closing_timeout, State=#state{status=Status}) -> end, disconnect(State, Reason); %% When reconnect is disabled, fail HTTP/Websocket operations immediately. -closing(cast, {headers, ReplyTo, StreamRef, _Method, _Path, _Headers, _InitialFlow}, +closing(cast, {headers, ReplyTo, StreamRef, _Method, _Path, _Headers, _InitialFlow, _Timeout}, State=#state{opts=#{retry := 0}}) -> ReplyTo ! {gun_error, self(), StreamRef, closing}, {keep_state, State}; -closing(cast, {request, ReplyTo, StreamRef, _Method, _Path, _Headers, _Body, _InitialFlow}, +closing(cast, {request, ReplyTo, StreamRef, _Method, _Path, _Headers, _Body, _InitialFlow, _Timeout}, State=#state{opts=#{retry := 0}}) -> ReplyTo ! {gun_error, self(), StreamRef, closing}, {keep_state, State}; diff --git a/src/gun_http.erl b/src/gun_http.erl index 58f4ed67..704d5782 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -26,11 +26,12 @@ -export([closing/4]). -export([close/4]). -export([keepalive/3]). --export([headers/12]). --export([request/13]). +-export([headers/13]). +-export([request/14]). -export([data/7]). --export([connect/9]). +-export([connect/10]). -export([cancel/5]). +-export([timeout/3]). -export([stream_info/2]). -export([down/1]). -export([ws_upgrade/11]). @@ -62,7 +63,8 @@ path :: iodata(), is_alive :: boolean(), - handler_state :: undefined | gun_content_handler:state() + handler_state :: undefined | gun_content_handler:state(), + timer_ref :: undefined | reference() }). -record(http_state, { @@ -545,7 +547,8 @@ close_streams(_, [], _) -> ok; close_streams(State, [#stream{is_alive=false}|Tail], Reason) -> close_streams(State, Tail, Reason); -close_streams(State, [#stream{ref=StreamRef, reply_to=ReplyTo}|Tail], Reason) -> +close_streams(State, [#stream{ref=StreamRef, reply_to=ReplyTo, timer_ref=TimerRef}|Tail], Reason) -> + cancel_stream_timer(TimerRef), ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), Reason}, close_streams(State, Tail, Reason). @@ -561,14 +564,14 @@ keepalive(#http_state{socket=Socket, transport=Transport, out=head}, _, EvHandle keepalive(_State, _, EvHandlerState) -> {[], EvHandlerState}. -headers(State, StreamRef, ReplyTo, _, _, _, _, _, _, CookieStore, _, EvHandlerState) +headers(State, StreamRef, ReplyTo, _, _, _, _, _, _, CookieStore, _, EvHandlerState, _) when is_list(StreamRef) -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, {[], CookieStore, EvHandlerState}; headers(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, - InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) -> + InitialFlow0, CookieStore0, EvHandler, EvHandlerState0, Timeout) -> {SendResult, Authority, Conn, Out, CookieStore, EvHandlerState} = send_request(State, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, undefined, CookieStore0, EvHandler, EvHandlerState0, ?FUNCTION_NAME), @@ -576,20 +579,20 @@ headers(State=#http_state{opts=Opts, out=head}, ok -> InitialFlow = initial_flow(InitialFlow0, Opts), {state, new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, - ReplyTo, Method, Authority, Path, InitialFlow)}; + ReplyTo, Method, Authority, Path, InitialFlow, Timeout)}; Error={error, _} -> Error end, {Command, CookieStore, EvHandlerState}. -request(State, StreamRef, ReplyTo, _, _, _, _, _, _, _, CookieStore, _, EvHandlerState) +request(State, StreamRef, ReplyTo, _, _, _, _, _, _, _, CookieStore, _, EvHandlerState, _) when is_list(StreamRef) -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, {[], CookieStore, EvHandlerState}; request(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body, - InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) -> + InitialFlow0, CookieStore0, EvHandler, EvHandlerState0, Timeout) -> {SendResult, Authority, Conn, Out, CookieStore, EvHandlerState} = send_request(State, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body, CookieStore0, EvHandler, EvHandlerState0, ?FUNCTION_NAME), @@ -597,7 +600,7 @@ request(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo, ok -> InitialFlow = initial_flow(InitialFlow0, Opts), {state, new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, - ReplyTo, Method, Authority, Path, InitialFlow)}; + ReplyTo, Method, Authority, Path, InitialFlow, Timeout)}; Error={error, _} -> Error end, @@ -760,19 +763,19 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, {[], EvHandlerState0} end. -connect(State, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState) +connect(State, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState, _) when is_list(StreamRef) -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, {[], EvHandlerState}; -connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState) +connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState, _) when Streams =/= [] -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "CONNECT can only be used with HTTP/1.1 when no other streams are active."}}, {[], EvHandlerState}; connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version}, StreamRef, ReplyTo, Destination=#{host := Host0}, _TunnelInfo, Headers0, InitialFlow0, - EvHandler, EvHandlerState0) -> + EvHandler, EvHandlerState0, Timeout) -> Host = case Host0 of Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple); _ -> Host0 @@ -817,7 +820,7 @@ connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2), InitialFlow = initial_flow(InitialFlow0, Opts), {{state, new_stream(State, {connect, StreamRef, Destination}, - ReplyTo, <<"CONNECT">>, Authority, <<>>, InitialFlow)}, + ReplyTo, <<"CONNECT">>, Authority, <<>>, InitialFlow, Timeout)}, EvHandlerState}; Error={error, _} -> {Error, EvHandlerState1} @@ -840,6 +843,17 @@ cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) -> {[], EvHandlerState0} end. +timeout(State0=#http_state{streams=Streams}, {?MODULE, stream_timeout, StreamRef}, TRef) -> + case lists:keyfind(StreamRef, #stream.ref, Streams) of + #stream{reply_to=ReplyTo, timer_ref=TRef} -> + error_stream_timeout(State0, StreamRef, ReplyTo), + State = cancel_stream(State0, StreamRef), + {state, State}; + _ -> + %% Ignore non-existing streams and streams where TRef doesn't match. + [] + end. + stream_info(#http_state{streams=Streams}, StreamRef) -> case lists:keyfind(StreamRef, #stream.ref, Streams) of #stream{reply_to=ReplyTo, is_alive=IsAlive} -> @@ -862,6 +876,11 @@ down(#http_state{streams=Streams}) -> _ -> Ref end || #stream{ref=Ref} <- Streams]. +error_stream_timeout(State, StreamRef, ReplyTo) -> + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {timeout, + "The stream has timed out."}}, + ok. + error_stream_closed(State, StreamRef, ReplyTo) -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream has already been closed."}}, @@ -934,11 +953,13 @@ stream_ref(#websocket{ref=StreamRef}) -> StreamRef; stream_ref(StreamRef) -> StreamRef. new_stream(State=#http_state{streams=Streams}, StreamRef, ReplyTo, - Method, Authority, Path, InitialFlow) -> + Method, Authority, Path, InitialFlow, Timeout) -> + TimerRef = start_stream_timer(StreamRef, Timeout), State#http_state{streams=Streams ++ [#stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, method=iolist_to_binary(Method), authority=Authority, - path=iolist_to_binary(Path), is_alive=true}]}. + path=iolist_to_binary(Path), is_alive=true, + timer_ref=TimerRef}]}. is_stream(#http_state{streams=Streams}, StreamRef) -> lists:keymember(StreamRef, #stream.ref, Streams). @@ -946,15 +967,28 @@ is_stream(#http_state{streams=Streams}, StreamRef) -> cancel_stream(State=#http_state{streams=Streams}, StreamRef) -> Streams2 = [case Ref of StreamRef -> - Tuple#stream{is_alive=false}; + cancel_stream_timer(TimerRef), + Tuple#stream{is_alive=false, timer_ref=undefined}; _ -> Tuple - end || Tuple = #stream{ref=Ref} <- Streams], + end || Tuple = #stream{ref=Ref, timer_ref=TimerRef} <- Streams], State#http_state{streams=Streams2}. -end_stream(State=#http_state{streams=[_|Tail]}) -> +end_stream(State=#http_state{streams=[#stream{timer_ref=TimerRef}|Tail]}) -> + cancel_stream_timer(TimerRef), State#http_state{in=head, streams=Tail}. +start_stream_timer(_StreamRef, infinity) -> + undefined; +start_stream_timer(StreamRef, Timeout) -> + erlang:start_timer(Timeout, self(), {?MODULE, stream_timeout, StreamRef}). + +cancel_stream_timer(undefined) -> + ok; +cancel_stream_timer(TimerRef) -> + _ = erlang:cancel_timer(TimerRef), + ok. + %% Websocket upgrade. ws_upgrade(State, StreamRef, ReplyTo, _, _, _, _, _, CookieStore, _, EvHandlerState) @@ -999,7 +1033,8 @@ ws_upgrade(State=#http_state{out=head}, StreamRef, ReplyTo, {state, new_stream(State#http_state{connection=Conn, out=Out}, #websocket{ref=StreamRef, reply_to=ReplyTo, key=Key, extensions=GunExtensions, opts=WsOpts}, - ReplyTo, <<"GET">>, Authority, Path, InitialFlow)}; + ReplyTo, <<"GET">>, Authority, Path, InitialFlow, + infinity)}; Error={error, _} -> Error end, diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 4b35c761..528ec634 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -27,10 +27,10 @@ -export([closing/4]). -export([close/4]). -export([keepalive/3]). --export([headers/12]). --export([request/13]). +-export([headers/13]). +-export([request/14]). -export([data/7]). --export([connect/9]). +-export([connect/10]). -export([cancel/5]). -export([timeout/3]). -export([stream_info/2]). @@ -79,7 +79,10 @@ handler_state :: undefined | gun_content_handler:state(), %% CONNECT tunnel. - tunnel :: undefined | #tunnel{} + tunnel :: undefined | #tunnel{}, + + %% Request timeout. + timer_ref :: undefined | reference() }). -record(http2_state, { @@ -934,8 +937,8 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport, pings_unack=Pin headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port, - Path, Headers0, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) - when is_reference(StreamRef) -> + Path, Headers0, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0, + Timeout) when is_reference(StreamRef) -> {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream( iolist_to_binary(Method), HTTP2Machine0), {ok, PseudoHeaders, Headers, CookieStore} = prepare_headers( @@ -958,8 +961,10 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), InitialFlow = initial_flow(InitialFlow0, Opts), + TimerRef = start_stream_timer(StreamRef, Timeout), Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, - flow=InitialFlow, authority=Authority, path=Path}, + flow=InitialFlow, authority=Authority, path=Path, + timer_ref=TimerRef}, {{state, create_stream(State#http2_state{ http2_machine=HTTP2Machine}, Stream)}, CookieStore, EvHandlerState}; @@ -968,14 +973,15 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, end; %% Tunneled request. headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, - Path, Headers, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) -> + Path, Headers, InitialFlow, CookieStore0, EvHandler, EvHandlerState0, + Timeout) -> case get_stream_by_ref(State, StreamRef) of %% @todo We should send an error to the user if the stream isn't ready. Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{ origin_host := OriginHost, origin_port := OriginPort}}} -> {Commands, CookieStore, EvHandlerState1} = Proto:headers(ProtoState0, RealStreamRef, ReplyTo, Method, OriginHost, OriginPort, Path, Headers, - InitialFlow, CookieStore0, EvHandler, EvHandlerState0), + InitialFlow, CookieStore0, EvHandler, EvHandlerState0, Timeout), {ResCommands, EvHandlerState} = tunnel_commands(Commands, Stream, State, EvHandler, EvHandlerState1), {ResCommands, CookieStore, EvHandlerState}; @@ -990,8 +996,8 @@ headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port, - Path, Headers0, Body, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) - when is_reference(StreamRef) -> + Path, Headers0, Body, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0, + Timeout) when is_reference(StreamRef) -> Headers1 = lists:keystore(<<"content-length">>, 1, Headers0, {<<"content-length">>, integer_to_binary(iolist_size(Body))}), {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream( @@ -1020,8 +1026,9 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, ok -> EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), InitialFlow = initial_flow(InitialFlow0, Opts), + TimerRef = start_stream_timer(StreamRef, Timeout), Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, - authority=Authority, path=Path}, + authority=Authority, path=Path, timer_ref=TimerRef}, State = create_stream(State0#http2_state{http2_machine=HTTP2Machine}, Stream), case IsFin of fin -> @@ -1033,6 +1040,10 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, nofin -> {StateOrError, EvHandlerStateRet} = maybe_send_data( State, StreamID, fin, Body, EvHandler, EvHandlerState), + case StateOrError of + {state, _} -> ok; + {error, _} -> cancel_stream_timer(TimerRef) + end, {StateOrError, CookieStore, EvHandlerStateRet} end; Error={error, _} -> @@ -1040,14 +1051,15 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, end; %% Tunneled request. request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, - Path, Headers, Body, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) -> + Path, Headers, Body, InitialFlow, CookieStore0, EvHandler, EvHandlerState0, + Timeout) -> case get_stream_by_ref(State, StreamRef) of %% @todo We should send an error to the user if the stream isn't ready. Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{ origin_host := OriginHost, origin_port := OriginPort}}} -> {Commands, CookieStore, EvHandlerState1} = Proto:request(ProtoState0, RealStreamRef, ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body, - InitialFlow, CookieStore0, EvHandler, EvHandlerState0), + InitialFlow, CookieStore0, EvHandler, EvHandlerState0, Timeout), {ResCommands, EvHandlerState} = tunnel_commands(Commands, Stream, State, EvHandler, EvHandlerState1), {ResCommands, CookieStore, EvHandlerState}; @@ -1221,7 +1233,8 @@ reset_stream(State0=#http2_state{socket=Socket, transport=Transport}, case Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)) of ok -> case take_stream(State0, StreamID) of - {#stream{ref=StreamRef, reply_to=ReplyTo}, State} -> + {#stream{ref=StreamRef, reply_to=ReplyTo, timer_ref=TimerRef}, State} -> + cancel_stream_timer(TimerRef), ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), StreamError}, {state, State}; error -> @@ -1234,7 +1247,7 @@ reset_stream(State0=#http2_state{socket=Socket, transport=Transport}, connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Destination=#{host := Host0}, TunnelInfo, Headers0, InitialFlow0, - EvHandler, EvHandlerState0) + EvHandler, EvHandlerState0, Timeout) when is_reference(StreamRef) -> Host = case Host0 of Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple); @@ -1281,9 +1294,11 @@ connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, }, EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2), InitialFlow = initial_flow(InitialFlow0, Opts), + TimerRef = start_stream_timer(StreamRef, Timeout), Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, authority=Authority, path= <<>>, - tunnel=#tunnel{destination=Destination, info=TunnelInfo}}, + tunnel=#tunnel{destination=Destination, info=TunnelInfo}, + timer_ref=TimerRef}, {{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream)}, EvHandlerState}; Error={error, _} -> @@ -1291,13 +1306,13 @@ connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, end; %% Tunneled request. connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow, - EvHandler, EvHandlerState0) -> + EvHandler, EvHandlerState0, Timeout) -> case get_stream_by_ref(State, StreamRef) of %% @todo Should we send an error to the user if the stream isn't ready. Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> {Commands, EvHandlerState1} = Proto:connect(ProtoState0, RealStreamRef, ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow, - EvHandler, EvHandlerState0), + EvHandler, EvHandlerState0, Timeout), tunnel_commands(Commands, Stream, State, EvHandler, EvHandlerState1); #stream{tunnel=undefined} -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, @@ -1347,6 +1362,22 @@ cancel(State, RealStreamRef=[StreamRef|_], ReplyTo, EvHandler, EvHandlerState0) {[], EvHandlerState0} end. +timeout(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, {?MODULE, stream_timeout, StreamRef}, TRef) -> + case get_stream_by_ref(State, StreamRef) of + #stream{id=StreamID, reply_to=ReplyTo, timer_ref=TRef} -> + {ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0), + case Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)) of + ok -> + error_stream_timeout(State, StreamRef, ReplyTo), + {state, delete_stream(State#http2_state{http2_machine=HTTP2Machine}, + StreamID)}; + Error={error, _} -> + Error + end; + %% We ignore timeout events for streams that no longer exist. + error -> + {state, State} + end; timeout(State=#http2_state{http2_machine=HTTP2Machine0}, {cow_http2_machine, undefined, Name}, TRef) -> case cow_http2_machine:timeout(Name, TRef, HTTP2Machine0) of {ok, HTTP2Machine} -> @@ -1526,6 +1557,11 @@ connection_error(#http2_state{socket=Socket, transport=Transport, %% Stream functions. +error_stream_timeout(State, StreamRef, ReplyTo) -> + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {timeout, + "The stream has timed out."}}, + ok. + error_stream_closed(State, StreamRef, ReplyTo) -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream has already been closed."}}, @@ -1592,8 +1628,20 @@ maybe_delete_stream(State, _, _, _) -> State. delete_stream(State=#http2_state{streams=Streams, stream_refs=Refs}, StreamID) -> - #{StreamID := #stream{ref=StreamRef}} = Streams, + #{StreamID := #stream{ref=StreamRef, timer_ref=TimerRef}} = Streams, + cancel_stream_timer(TimerRef), State#http2_state{ streams=maps:remove(StreamID, Streams), stream_refs=maps:remove(StreamRef, Refs) }. + +start_stream_timer(_StreamRef, infinity) -> + undefined; +start_stream_timer(StreamRef, Timeout) -> + erlang:start_timer(Timeout, self(), {?MODULE, stream_timeout, StreamRef}). + +cancel_stream_timer(undefined) -> + ok; +cancel_stream_timer(TimerRef) -> + _ = erlang:cancel_timer(TimerRef), + ok. diff --git a/src/gun_tunnel.erl b/src/gun_tunnel.erl index c7fecc7e..b58a4eb8 100644 --- a/src/gun_tunnel.erl +++ b/src/gun_tunnel.erl @@ -24,10 +24,10 @@ -export([closing/4]). -export([close/4]). -export([keepalive/3]). --export([headers/12]). --export([request/13]). +-export([headers/13]). +-export([request/14]). -export([data/7]). --export([connect/9]). +-export([connect/10]). -export([cancel/5]). -export([timeout/3]). -export([stream_info/2]). @@ -288,11 +288,12 @@ keepalive(_State, _EvHandler, EvHandlerState) -> %% We pass the headers forward and optionally dereference StreamRef. headers(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0}, StreamRef0, ReplyTo, Method, Host, Port, Path, Headers, - InitialFlow, CookieStore0, EvHandler, EvHandlerState0) -> + InitialFlow, CookieStore0, EvHandler, EvHandlerState0, + Timeout) -> StreamRef = maybe_dereference(State, StreamRef0), {Commands, CookieStore, EvHandlerState1} = Proto:headers(ProtoState0, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, - InitialFlow, CookieStore0, EvHandler, EvHandlerState0), + InitialFlow, CookieStore0, EvHandler, EvHandlerState0, Timeout), {ResCommands, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1), {ResCommands, CookieStore, EvHandlerState}. @@ -300,11 +301,11 @@ headers(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0}, request(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0, info=#{origin_host := OriginHost, origin_port := OriginPort}}, StreamRef0, ReplyTo, Method, _Host, _Port, Path, Headers, Body, - InitialFlow, CookieStore0, EvHandler, EvHandlerState0) -> + InitialFlow, CookieStore0, EvHandler, EvHandlerState0, Timeout) -> StreamRef = maybe_dereference(State, StreamRef0), {Commands, CookieStore, EvHandlerState1} = Proto:request(ProtoState0, StreamRef, ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body, - InitialFlow, CookieStore0, EvHandler, EvHandlerState0), + InitialFlow, CookieStore0, EvHandler, EvHandlerState0, Timeout), {ResCommands, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1), {ResCommands, CookieStore, EvHandlerState}. @@ -344,11 +345,11 @@ data(State=#tunnel_state{socket=Socket, transport=Transport, connect(State=#tunnel_state{info=#{origin_host := Host, origin_port := Port}, protocol=Proto, protocol_state=ProtoState0}, StreamRef0, ReplyTo, Destination, _, Headers, InitialFlow, - EvHandler, EvHandlerState0) -> + EvHandler, EvHandlerState0, Timeout) -> StreamRef = maybe_dereference(State, StreamRef0), {Commands, EvHandlerState1} = Proto:connect(ProtoState0, StreamRef, ReplyTo, Destination, #{host => Host, port => Port}, Headers, InitialFlow, - EvHandler, EvHandlerState0), + EvHandler, EvHandlerState0, Timeout), {ResCommands, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1), {ResCommands, EvHandlerState}.