Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request timeout (WIP) #330

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions src/gun.erl
Original file line number Diff line number Diff line change
@@ -199,7 +199,8 @@
-type req_opts() :: #{
flow => pos_integer(),
reply_to => pid(),
tunnel => stream_ref()
tunnel => stream_ref(),
timeout => timeout()
}.
-export_type([req_opts/0]).

@@ -634,8 +635,9 @@ headers(ServerPid, Method, Path, Headers0, ReqOpts) ->
StreamRef = make_stream_ref(Tunnel),
InitialFlow = maps:get(flow, ReqOpts, infinity),
ReplyTo = maps:get(reply_to, ReqOpts, self()),
Timeout = maps:get(timeout, ReqOpts, infinity),
gen_statem:cast(ServerPid, {headers, ReplyTo, StreamRef,
Method, Path, normalize_headers(Headers0), InitialFlow}),
Method, Path, normalize_headers(Headers0), InitialFlow, Timeout}),
StreamRef.

-spec request(pid(), iodata(), iodata(), req_headers(), iodata()) -> stream_ref().
@@ -648,8 +650,9 @@ request(ServerPid, Method, Path, Headers, Body, ReqOpts) ->
StreamRef = make_stream_ref(Tunnel),
InitialFlow = maps:get(flow, ReqOpts, infinity),
ReplyTo = maps:get(reply_to, ReqOpts, self()),
Timeout = maps:get(timeout, ReqOpts, infinity),
gen_statem:cast(ServerPid, {request, ReplyTo, StreamRef,
Method, Path, normalize_headers(Headers), Body, InitialFlow}),
Method, Path, normalize_headers(Headers), Body, InitialFlow, Timeout}),
StreamRef.

get_tunnel(#{tunnel := Tunnel}) when is_reference(Tunnel) ->
@@ -700,8 +703,9 @@ connect(ServerPid, Destination, Headers, ReqOpts) ->
StreamRef = make_stream_ref(Tunnel),
InitialFlow = maps:get(flow, ReqOpts, infinity),
ReplyTo = maps:get(reply_to, ReqOpts, self()),
Timeout = maps:get(timeout, ReqOpts, infinity),
gen_statem:cast(ServerPid, {connect, ReplyTo, StreamRef,
Destination, Headers, InitialFlow}),
Destination, Headers, InitialFlow, Timeout}),
StreamRef.

%% Awaiting gun messages.
@@ -1295,34 +1299,34 @@ connected_ws_only(Type, Event, State) ->
%%
%% @todo It might be better, internally, to pass around a URIMap
%% containing the target URI, instead of separate Host/Port/PathWithQs.
connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers, InitialFlow},
connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers, InitialFlow, Timeout},
State=#state{origin_host=Host, origin_port=Port,
protocol=Protocol, protocol_state=ProtoState, cookie_store=CookieStore0,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{Commands, CookieStore, EvHandlerState} = Protocol:headers(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo,
Method, Host, Port, Path, Headers,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
InitialFlow, CookieStore0, EvHandler, EvHandlerState0, Timeout),
commands(Commands, State#state{cookie_store=CookieStore,
event_handler_state=EvHandlerState});
connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body, InitialFlow},
connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body, InitialFlow, Timeout},
State=#state{origin_host=Host, origin_port=Port,
protocol=Protocol, protocol_state=ProtoState, cookie_store=CookieStore0,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{Commands, CookieStore, EvHandlerState} = Protocol:request(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo,
Method, Host, Port, Path, Headers, Body,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
InitialFlow, CookieStore0, EvHandler, EvHandlerState0, Timeout),
commands(Commands, State#state{cookie_store=CookieStore,
event_handler_state=EvHandlerState});
connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow},
connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow, Timeout},
State=#state{origin_host=Host, origin_port=Port,
protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{Commands, EvHandlerState} = Protocol:connect(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo,
Destination, #{host => Host, port => Port},
Headers, InitialFlow, EvHandler, EvHandlerState0),
Headers, InitialFlow, EvHandler, EvHandlerState0, Timeout),
commands(Commands, State#state{event_handler_state=EvHandlerState});
%% Public Websocket interface.
connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers}, State=#state{opts=Opts}) ->
@@ -1387,11 +1391,11 @@ closing(state_timeout, closing_timeout, State=#state{status=Status}) ->
end,
disconnect(State, Reason);
%% When reconnect is disabled, fail HTTP/Websocket operations immediately.
closing(cast, {headers, ReplyTo, StreamRef, _Method, _Path, _Headers, _InitialFlow},
closing(cast, {headers, ReplyTo, StreamRef, _Method, _Path, _Headers, _InitialFlow, _Timeout},
State=#state{opts=#{retry := 0}}) ->
ReplyTo ! {gun_error, self(), StreamRef, closing},
{keep_state, State};
closing(cast, {request, ReplyTo, StreamRef, _Method, _Path, _Headers, _Body, _InitialFlow},
closing(cast, {request, ReplyTo, StreamRef, _Method, _Path, _Headers, _Body, _InitialFlow, _Timeout},
State=#state{opts=#{retry := 0}}) ->
ReplyTo ! {gun_error, self(), StreamRef, closing},
{keep_state, State};
77 changes: 56 additions & 21 deletions src/gun_http.erl
Original file line number Diff line number Diff line change
@@ -26,11 +26,12 @@
-export([closing/4]).
-export([close/4]).
-export([keepalive/3]).
-export([headers/12]).
-export([request/13]).
-export([headers/13]).
-export([request/14]).
-export([data/7]).
-export([connect/9]).
-export([connect/10]).
-export([cancel/5]).
-export([timeout/3]).
-export([stream_info/2]).
-export([down/1]).
-export([ws_upgrade/11]).
@@ -62,7 +63,8 @@
path :: iodata(),

is_alive :: boolean(),
handler_state :: undefined | gun_content_handler:state()
handler_state :: undefined | gun_content_handler:state(),
timer_ref :: undefined | reference()
}).

-record(http_state, {
@@ -545,7 +547,8 @@ close_streams(_, [], _) ->
ok;
close_streams(State, [#stream{is_alive=false}|Tail], Reason) ->
close_streams(State, Tail, Reason);
close_streams(State, [#stream{ref=StreamRef, reply_to=ReplyTo}|Tail], Reason) ->
close_streams(State, [#stream{ref=StreamRef, reply_to=ReplyTo, timer_ref=TimerRef}|Tail], Reason) ->
cancel_stream_timer(TimerRef),
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), Reason},
close_streams(State, Tail, Reason).

@@ -561,43 +564,43 @@ keepalive(#http_state{socket=Socket, transport=Transport, out=head}, _, EvHandle
keepalive(_State, _, EvHandlerState) ->
{[], EvHandlerState}.

headers(State, StreamRef, ReplyTo, _, _, _, _, _, _, CookieStore, _, EvHandlerState)
headers(State, StreamRef, ReplyTo, _, _, _, _, _, _, CookieStore, _, EvHandlerState, _)
when is_list(StreamRef) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{badstate, "The stream is not a tunnel."}},
{[], CookieStore, EvHandlerState};
headers(State=#http_state{opts=Opts, out=head},
StreamRef, ReplyTo, Method, Host, Port, Path, Headers,
InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) ->
InitialFlow0, CookieStore0, EvHandler, EvHandlerState0, Timeout) ->
{SendResult, Authority, Conn, Out, CookieStore, EvHandlerState} = send_request(State,
StreamRef, ReplyTo, Method, Host, Port, Path, Headers, undefined,
CookieStore0, EvHandler, EvHandlerState0, ?FUNCTION_NAME),
Command = case SendResult of
ok ->
InitialFlow = initial_flow(InitialFlow0, Opts),
{state, new_stream(State#http_state{connection=Conn, out=Out}, StreamRef,
ReplyTo, Method, Authority, Path, InitialFlow)};
ReplyTo, Method, Authority, Path, InitialFlow, Timeout)};
Error={error, _} ->
Error
end,
{Command, CookieStore, EvHandlerState}.

request(State, StreamRef, ReplyTo, _, _, _, _, _, _, _, CookieStore, _, EvHandlerState)
request(State, StreamRef, ReplyTo, _, _, _, _, _, _, _, CookieStore, _, EvHandlerState, _)
when is_list(StreamRef) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{badstate, "The stream is not a tunnel."}},
{[], CookieStore, EvHandlerState};
request(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo,
Method, Host, Port, Path, Headers, Body,
InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) ->
InitialFlow0, CookieStore0, EvHandler, EvHandlerState0, Timeout) ->
{SendResult, Authority, Conn, Out, CookieStore, EvHandlerState} = send_request(State,
StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body,
CookieStore0, EvHandler, EvHandlerState0, ?FUNCTION_NAME),
Command = case SendResult of
ok ->
InitialFlow = initial_flow(InitialFlow0, Opts),
{state, new_stream(State#http_state{connection=Conn, out=Out}, StreamRef,
ReplyTo, Method, Authority, Path, InitialFlow)};
ReplyTo, Method, Authority, Path, InitialFlow, Timeout)};
Error={error, _} ->
Error
end,
@@ -760,19 +763,19 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
{[], EvHandlerState0}
end.

connect(State, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState)
connect(State, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState, _)
when is_list(StreamRef) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{badstate, "The stream is not a tunnel."}},
{[], EvHandlerState};
connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState)
connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState, _)
when Streams =/= [] ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"CONNECT can only be used with HTTP/1.1 when no other streams are active."}},
{[], EvHandlerState};
connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version},
StreamRef, ReplyTo, Destination=#{host := Host0}, _TunnelInfo, Headers0, InitialFlow0,
EvHandler, EvHandlerState0) ->
EvHandler, EvHandlerState0, Timeout) ->
Host = case Host0 of
Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple);
_ -> Host0
@@ -817,7 +820,7 @@ connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version
EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2),
InitialFlow = initial_flow(InitialFlow0, Opts),
{{state, new_stream(State, {connect, StreamRef, Destination},
ReplyTo, <<"CONNECT">>, Authority, <<>>, InitialFlow)},
ReplyTo, <<"CONNECT">>, Authority, <<>>, InitialFlow, Timeout)},
EvHandlerState};
Error={error, _} ->
{Error, EvHandlerState1}
@@ -840,6 +843,17 @@ cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
{[], EvHandlerState0}
end.

timeout(State0=#http_state{streams=Streams}, {?MODULE, stream_timeout, StreamRef}, TRef) ->
case lists:keyfind(StreamRef, #stream.ref, Streams) of
#stream{reply_to=ReplyTo, timer_ref=TRef} ->
error_stream_timeout(State0, StreamRef, ReplyTo),
State = cancel_stream(State0, StreamRef),
{state, State};
_ ->
%% Ignore non-existing streams and streams where TRef doesn't match.
[]
end.

stream_info(#http_state{streams=Streams}, StreamRef) ->
case lists:keyfind(StreamRef, #stream.ref, Streams) of
#stream{reply_to=ReplyTo, is_alive=IsAlive} ->
@@ -862,6 +876,11 @@ down(#http_state{streams=Streams}) ->
_ -> Ref
end || #stream{ref=Ref} <- Streams].

error_stream_timeout(State, StreamRef, ReplyTo) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {timeout,
"The stream has timed out."}},
ok.

error_stream_closed(State, StreamRef, ReplyTo) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream has already been closed."}},
@@ -934,27 +953,42 @@ stream_ref(#websocket{ref=StreamRef}) -> StreamRef;
stream_ref(StreamRef) -> StreamRef.

new_stream(State=#http_state{streams=Streams}, StreamRef, ReplyTo,
Method, Authority, Path, InitialFlow) ->
Method, Authority, Path, InitialFlow, Timeout) ->
TimerRef = start_stream_timer(StreamRef, Timeout),
State#http_state{streams=Streams
++ [#stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow,
method=iolist_to_binary(Method), authority=Authority,
path=iolist_to_binary(Path), is_alive=true}]}.
path=iolist_to_binary(Path), is_alive=true,
timer_ref=TimerRef}]}.

is_stream(#http_state{streams=Streams}, StreamRef) ->
lists:keymember(StreamRef, #stream.ref, Streams).

cancel_stream(State=#http_state{streams=Streams}, StreamRef) ->
Streams2 = [case Ref of
StreamRef ->
Tuple#stream{is_alive=false};
cancel_stream_timer(TimerRef),
Tuple#stream{is_alive=false, timer_ref=undefined};
_ ->
Tuple
end || Tuple = #stream{ref=Ref} <- Streams],
end || Tuple = #stream{ref=Ref, timer_ref=TimerRef} <- Streams],
State#http_state{streams=Streams2}.

end_stream(State=#http_state{streams=[_|Tail]}) ->
end_stream(State=#http_state{streams=[#stream{timer_ref=TimerRef}|Tail]}) ->
cancel_stream_timer(TimerRef),
State#http_state{in=head, streams=Tail}.

start_stream_timer(_StreamRef, infinity) ->
undefined;
start_stream_timer(StreamRef, Timeout) ->
erlang:start_timer(Timeout, self(), {?MODULE, stream_timeout, StreamRef}).

cancel_stream_timer(undefined) ->
ok;
cancel_stream_timer(TimerRef) ->
_ = erlang:cancel_timer(TimerRef),
ok.

%% Websocket upgrade.

ws_upgrade(State, StreamRef, ReplyTo, _, _, _, _, _, CookieStore, _, EvHandlerState)
@@ -999,7 +1033,8 @@ ws_upgrade(State=#http_state{out=head}, StreamRef, ReplyTo,
{state, new_stream(State#http_state{connection=Conn, out=Out},
#websocket{ref=StreamRef, reply_to=ReplyTo, key=Key,
extensions=GunExtensions, opts=WsOpts},
ReplyTo, <<"GET">>, Authority, Path, InitialFlow)};
ReplyTo, <<"GET">>, Authority, Path, InitialFlow,
infinity)};
Error={error, _} ->
Error
end,
88 changes: 68 additions & 20 deletions src/gun_http2.erl
Original file line number Diff line number Diff line change
@@ -27,10 +27,10 @@
-export([closing/4]).
-export([close/4]).
-export([keepalive/3]).
-export([headers/12]).
-export([request/13]).
-export([headers/13]).
-export([request/14]).
-export([data/7]).
-export([connect/9]).
-export([connect/10]).
-export([cancel/5]).
-export([timeout/3]).
-export([stream_info/2]).
@@ -79,7 +79,10 @@
handler_state :: undefined | gun_content_handler:state(),

%% CONNECT tunnel.
tunnel :: undefined | #tunnel{}
tunnel :: undefined | #tunnel{},

%% Request timeout.
timer_ref :: undefined | reference()
}).

-record(http2_state, {
@@ -934,8 +937,8 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport, pings_unack=Pin

headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port,
Path, Headers0, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0)
when is_reference(StreamRef) ->
Path, Headers0, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0,
Timeout) when is_reference(StreamRef) ->
{ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(
iolist_to_binary(Method), HTTP2Machine0),
{ok, PseudoHeaders, Headers, CookieStore} = prepare_headers(
@@ -958,8 +961,10 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
EvHandlerState = EvHandler:request_headers(RequestEvent,
EvHandlerState1),
InitialFlow = initial_flow(InitialFlow0, Opts),
TimerRef = start_stream_timer(StreamRef, Timeout),
Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo,
flow=InitialFlow, authority=Authority, path=Path},
flow=InitialFlow, authority=Authority, path=Path,
timer_ref=TimerRef},
{{state, create_stream(State#http2_state{
http2_machine=HTTP2Machine}, Stream)}, CookieStore,
EvHandlerState};
@@ -968,14 +973,15 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
end;
%% Tunneled request.
headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
Path, Headers, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) ->
Path, Headers, InitialFlow, CookieStore0, EvHandler, EvHandlerState0,
Timeout) ->
case get_stream_by_ref(State, StreamRef) of
%% @todo We should send an error to the user if the stream isn't ready.
Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{
origin_host := OriginHost, origin_port := OriginPort}}} ->
{Commands, CookieStore, EvHandlerState1} = Proto:headers(ProtoState0, RealStreamRef,
ReplyTo, Method, OriginHost, OriginPort, Path, Headers,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
InitialFlow, CookieStore0, EvHandler, EvHandlerState0, Timeout),
{ResCommands, EvHandlerState} = tunnel_commands(Commands, Stream,
State, EvHandler, EvHandlerState1),
{ResCommands, CookieStore, EvHandlerState};
@@ -990,8 +996,8 @@ headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,

request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port,
Path, Headers0, Body, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0)
when is_reference(StreamRef) ->
Path, Headers0, Body, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0,
Timeout) when is_reference(StreamRef) ->
Headers1 = lists:keystore(<<"content-length">>, 1, Headers0,
{<<"content-length">>, integer_to_binary(iolist_size(Body))}),
{ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(
@@ -1020,8 +1026,9 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
ok ->
EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1),
InitialFlow = initial_flow(InitialFlow0, Opts),
TimerRef = start_stream_timer(StreamRef, Timeout),
Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow,
authority=Authority, path=Path},
authority=Authority, path=Path, timer_ref=TimerRef},
State = create_stream(State0#http2_state{http2_machine=HTTP2Machine}, Stream),
case IsFin of
fin ->
@@ -1033,21 +1040,26 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
nofin ->
{StateOrError, EvHandlerStateRet} = maybe_send_data(
State, StreamID, fin, Body, EvHandler, EvHandlerState),
case StateOrError of
{state, _} -> ok;
{error, _} -> cancel_stream_timer(TimerRef)
end,
{StateOrError, CookieStore, EvHandlerStateRet}
end;
Error={error, _} ->
{Error, CookieStore, EvHandlerState1}
end;
%% Tunneled request.
request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
Path, Headers, Body, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) ->
Path, Headers, Body, InitialFlow, CookieStore0, EvHandler, EvHandlerState0,
Timeout) ->
case get_stream_by_ref(State, StreamRef) of
%% @todo We should send an error to the user if the stream isn't ready.
Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{
origin_host := OriginHost, origin_port := OriginPort}}} ->
{Commands, CookieStore, EvHandlerState1} = Proto:request(ProtoState0, RealStreamRef,
ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
InitialFlow, CookieStore0, EvHandler, EvHandlerState0, Timeout),
{ResCommands, EvHandlerState} = tunnel_commands(Commands,
Stream, State, EvHandler, EvHandlerState1),
{ResCommands, CookieStore, EvHandlerState};
@@ -1221,7 +1233,8 @@ reset_stream(State0=#http2_state{socket=Socket, transport=Transport},
case Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)) of
ok ->
case take_stream(State0, StreamID) of
{#stream{ref=StreamRef, reply_to=ReplyTo}, State} ->
{#stream{ref=StreamRef, reply_to=ReplyTo, timer_ref=TimerRef}, State} ->
cancel_stream_timer(TimerRef),
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), StreamError},
{state, State};
error ->
@@ -1234,7 +1247,7 @@ reset_stream(State0=#http2_state{socket=Socket, transport=Transport},
connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo,
Destination=#{host := Host0}, TunnelInfo, Headers0, InitialFlow0,
EvHandler, EvHandlerState0)
EvHandler, EvHandlerState0, Timeout)
when is_reference(StreamRef) ->
Host = case Host0 of
Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple);
@@ -1281,23 +1294,25 @@ connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
},
EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2),
InitialFlow = initial_flow(InitialFlow0, Opts),
TimerRef = start_stream_timer(StreamRef, Timeout),
Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo,
flow=InitialFlow, authority=Authority, path= <<>>,
tunnel=#tunnel{destination=Destination, info=TunnelInfo}},
tunnel=#tunnel{destination=Destination, info=TunnelInfo},
timer_ref=TimerRef},
{{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream)},
EvHandlerState};
Error={error, _} ->
{Error, EvHandlerState1}
end;
%% Tunneled request.
connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow,
EvHandler, EvHandlerState0) ->
EvHandler, EvHandlerState0, Timeout) ->
case get_stream_by_ref(State, StreamRef) of
%% @todo Should we send an error to the user if the stream isn't ready.
Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
{Commands, EvHandlerState1} = Proto:connect(ProtoState0, RealStreamRef,
ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow,
EvHandler, EvHandlerState0),
EvHandler, EvHandlerState0, Timeout),
tunnel_commands(Commands, Stream, State, EvHandler, EvHandlerState1);
#stream{tunnel=undefined} ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
@@ -1347,6 +1362,22 @@ cancel(State, RealStreamRef=[StreamRef|_], ReplyTo, EvHandler, EvHandlerState0)
{[], EvHandlerState0}
end.

timeout(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, {?MODULE, stream_timeout, StreamRef}, TRef) ->
case get_stream_by_ref(State, StreamRef) of
#stream{id=StreamID, reply_to=ReplyTo, timer_ref=TRef} ->
{ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0),
case Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)) of
ok ->
error_stream_timeout(State, StreamRef, ReplyTo),
{state, delete_stream(State#http2_state{http2_machine=HTTP2Machine},
StreamID)};
Error={error, _} ->
Error
end;
%% We ignore timeout events for streams that no longer exist.
error ->
{state, State}
end;
timeout(State=#http2_state{http2_machine=HTTP2Machine0}, {cow_http2_machine, undefined, Name}, TRef) ->
case cow_http2_machine:timeout(Name, TRef, HTTP2Machine0) of
{ok, HTTP2Machine} ->
@@ -1526,6 +1557,11 @@ connection_error(#http2_state{socket=Socket, transport=Transport,

%% Stream functions.

error_stream_timeout(State, StreamRef, ReplyTo) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {timeout,
"The stream has timed out."}},
ok.

error_stream_closed(State, StreamRef, ReplyTo) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream has already been closed."}},
@@ -1592,8 +1628,20 @@ maybe_delete_stream(State, _, _, _) ->
State.

delete_stream(State=#http2_state{streams=Streams, stream_refs=Refs}, StreamID) ->
#{StreamID := #stream{ref=StreamRef}} = Streams,
#{StreamID := #stream{ref=StreamRef, timer_ref=TimerRef}} = Streams,
cancel_stream_timer(TimerRef),
State#http2_state{
streams=maps:remove(StreamID, Streams),
stream_refs=maps:remove(StreamRef, Refs)
}.

start_stream_timer(_StreamRef, infinity) ->
undefined;
start_stream_timer(StreamRef, Timeout) ->
erlang:start_timer(Timeout, self(), {?MODULE, stream_timeout, StreamRef}).

cancel_stream_timer(undefined) ->
ok;
cancel_stream_timer(TimerRef) ->
_ = erlang:cancel_timer(TimerRef),
ok.
19 changes: 10 additions & 9 deletions src/gun_tunnel.erl
Original file line number Diff line number Diff line change
@@ -24,10 +24,10 @@
-export([closing/4]).
-export([close/4]).
-export([keepalive/3]).
-export([headers/12]).
-export([request/13]).
-export([headers/13]).
-export([request/14]).
-export([data/7]).
-export([connect/9]).
-export([connect/10]).
-export([cancel/5]).
-export([timeout/3]).
-export([stream_info/2]).
@@ -288,23 +288,24 @@ keepalive(_State, _EvHandler, EvHandlerState) ->
%% We pass the headers forward and optionally dereference StreamRef.
headers(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0},
StreamRef0, ReplyTo, Method, Host, Port, Path, Headers,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0) ->
InitialFlow, CookieStore0, EvHandler, EvHandlerState0,
Timeout) ->
StreamRef = maybe_dereference(State, StreamRef0),
{Commands, CookieStore, EvHandlerState1} = Proto:headers(ProtoState0, StreamRef,
ReplyTo, Method, Host, Port, Path, Headers,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
InitialFlow, CookieStore0, EvHandler, EvHandlerState0, Timeout),
{ResCommands, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1),
{ResCommands, CookieStore, EvHandlerState}.

%% We pass the request forward and optionally dereference StreamRef.
request(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0,
info=#{origin_host := OriginHost, origin_port := OriginPort}},
StreamRef0, ReplyTo, Method, _Host, _Port, Path, Headers, Body,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0) ->
InitialFlow, CookieStore0, EvHandler, EvHandlerState0, Timeout) ->
StreamRef = maybe_dereference(State, StreamRef0),
{Commands, CookieStore, EvHandlerState1} = Proto:request(ProtoState0, StreamRef,
ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
InitialFlow, CookieStore0, EvHandler, EvHandlerState0, Timeout),
{ResCommands, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1),
{ResCommands, CookieStore, EvHandlerState}.

@@ -344,11 +345,11 @@ data(State=#tunnel_state{socket=Socket, transport=Transport,
connect(State=#tunnel_state{info=#{origin_host := Host, origin_port := Port},
protocol=Proto, protocol_state=ProtoState0},
StreamRef0, ReplyTo, Destination, _, Headers, InitialFlow,
EvHandler, EvHandlerState0) ->
EvHandler, EvHandlerState0, Timeout) ->
StreamRef = maybe_dereference(State, StreamRef0),
{Commands, EvHandlerState1} = Proto:connect(ProtoState0, StreamRef,
ReplyTo, Destination, #{host => Host, port => Port}, Headers, InitialFlow,
EvHandler, EvHandlerState0),
EvHandler, EvHandlerState0, Timeout),
{ResCommands, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1),
{ResCommands, EvHandlerState}.