Skip to content

Commit

Permalink
fix: fixed the proxy could not be released even if the RC was 0
Browse files Browse the repository at this point in the history
  • Loading branch information
lafirest committed Jul 13, 2024
1 parent 4ec0382 commit 2da91fb
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 90 deletions.
1 change: 1 addition & 0 deletions include/esockd_proxy.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
-type get_connection_id_result() ::
%% send decoded packet
{ok, connection_id(), connection_packet(), connection_state()}
| {error, binary()}
| invalid.

-type connection_options() :: #{
Expand Down
123 changes: 76 additions & 47 deletions src/udp_proxy/esockd_udp_proxy.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
-include("include/esockd_proxy.hrl").

%% API
-export([start_link/3, send/2, close/1]).
-export([start_link/3, send/2, close/1, takeover/2]).

%% gen_server callbacks
-export([
Expand Down Expand Up @@ -50,6 +50,8 @@
connection_mod := connection_module(),
connection_id := connection_id() | undefined,
connection_state := connection_state(),
connection_pid := pid() | undefined,
connection_ref := reference() | undefined,
connection_options := connection_options(),
%% last source's connection active time
last_time := pos_integer(),
Expand All @@ -76,6 +78,10 @@ close(ProxyId) ->
ok
end.

takeover(ProxyId, CId) ->
_ = gen_server:cast(ProxyId, {?FUNCTION_NAME, CId}),
ok.

%%--------------------------------------------------------------------
%%- gen_server callbacks
%%--------------------------------------------------------------------
Expand All @@ -88,7 +94,9 @@ init([Transport, Peer, #{esockd_proxy_opts := Opts} = COpts]) ->
connection_mod => Mod,
connection_options => COpts,
connection_state => esockd_udp_proxy_connection:initialize(Mod, COpts),
connection_id => undefined
connection_id => undefined,
connection_pid => undefined,
connection_ref => undefined
}).

handle_call(close, _From, State) ->
Expand All @@ -105,14 +113,18 @@ handle_cast({send, Data}, #{transport := Transport, peer := Peer} = State) ->
?ERROR_MSG("Send failed, Reason: ~0p", [Reason]),
{stop, {sock_error, Reason}, State}
end;
handle_cast({takeover, CId}, #{connection_id := CId} = State) ->
{stop, {shutdown, takeover}, State};
handle_cast({takeover, _CId}, State) ->
{noreply, State};
handle_cast(Request, State) ->
?ERROR_MSG("Unexpected cast: ~p", [Request]),
{noreply, State}.

handle_info({datagram, _SockPid, Data}, State) ->
{noreply, handle_incoming(Data, State)};
handle_incoming(Data, State);
handle_info({ssl, _Socket, Data}, State) ->
{noreply, handle_incoming(Data, State)};
handle_incoming(Data, State);
handle_info({heartbeat, Span}, #{last_time := LastTime} = State) ->
Now = ?NOW,
case Now - LastTime > Span of
Expand All @@ -127,9 +139,10 @@ handle_info({ssl_error, _Sock, Reason}, State) ->
handle_info({ssl_closed, _Sock}, State) ->
{stop, ssl_closed, socket_exit(State)};
handle_info(
{'DOWN', _, process, _, _Reason},
{'DOWN', _, process, Pid, _Reason},
State
) ->
ct:print(" >>>> conn down, Pid:~p, Reason:~p~n", [Pid, _Reason]),
{stop, {shutdown, connection_closed}, State};
handle_info(Info, State) ->
?ERROR_MSG("Unexpected info: ~p", [Info]),
Expand All @@ -143,9 +156,12 @@ terminate(Reason, #{transport := Transport} = State) ->
false;
connection_closed ->
false;
takeover ->
false;
_ ->
true
end,
ct:print(">>>> esockd:~p terminate:~p~n", [self(), Reason]),
detach(State, Clear).

%%--------------------------------------------------------------------
Expand All @@ -161,11 +177,17 @@ handle_incoming(
case esockd_udp_proxy_connection:get_connection_id(Mod, Transport, Peer, CState, Data) of
{ok, CId, Packet, CState2} ->
dispatch(Mod, CId, Data, Packet, State2#{connection_state := CState2});
{error, Reply} ->
?ERROR_MSG("Can't get connection id, Transport:~0p, Peer:~0p, Mod:~0p", [
Transport, Peer, Mod
]),
send(Transport, Peer, Reply),
{stop, {shutdown, no_clientid}, State2};
invalid ->
?ERROR_MSG("Can't get connection id, Transport:~0p, Peer:~0p, Mod:~0p", [
Transport, Peer, Mod
]),
State2
{stop, {shutdown, no_clientid}, State2}
end.

-spec dispatch(
Expand All @@ -183,43 +205,55 @@ dispatch(
Packet,
#{
transport := Transport,
peer := Peer,
connection_state := CState,
connection_options := Opts
connection_state := CState
} =
State
) ->
case lookup(Mod, Transport, Peer, CId, Opts) of
case lookup(CId, State) of
{ok, Pid} ->
Result = attach(CId, State, Pid),
esockd_udp_proxy_connection:dispatch(
Mod, Pid, CState, {Transport, Data, Packet}
),
attach(CId, State);
{noreply, Result};
{error, Reason} ->
?ERROR_MSG("Dispatch failed, Reason:~0p", [Reason]),
State
{noreply, State}
end.

-spec attach(connection_id(), state()) -> state().
attach(CId, #{connection_mod := Mod, connection_id := undefined} = State) ->
-spec attach(connection_id(), state(), pid()) -> state().
attach(CId, #{connection_mod := Mod, connection_id := undefined} = State, Pid) ->
esockd_udp_proxy_db:attach(Mod, CId),
State#{connection_id := CId};
attach(CId, #{connection_id := OldId} = State) when CId =/= OldId ->
State2 = detach(State),
attach(CId, State2);
attach(_CId, State) ->
Ref = erlang:monitor(process, Pid),
State#{connection_id := CId, connection_pid := Pid, connection_ref := Ref};
attach(CId, #{connection_id := OldId} = State, Pid) when CId =/= OldId ->
State2 = detach(State, false),
attach(CId, State2, Pid);
attach(_CId, State, _Pid) ->
State.

-spec detach(state()) -> state().
detach(State) ->
detach(State, true).

-spec detach(state(), boolean()) -> state().
-spec detach(state()) -> state().
detach(#{connection_id := undefined} = State, _Clear) ->
State;
detach(#{connection_id := CId, connection_mod := Mod, connection_state := CState} = State, Clear) ->
case esockd_udp_proxy_db:detach(Mod, CId) of
{Clear, Pid} ->
detach(
#{
connection_id := CId,
connection_pid := Pid,
connection_ref := Ref,
connection_mod := Mod,
connection_state := CState
} = State,
Clear
) ->
erlang:demonitor(Ref),

Result = esockd_udp_proxy_db:detach(Mod, CId),
ct:print(">>>> detach:~p result:~p, pid:~p~n", [self(), Result, Pid]),
case Clear andalso Result of
true ->
case erlang:is_process_alive(Pid) of
true ->
esockd_udp_proxy_connection:close(Mod, Pid, CState);
Expand All @@ -229,7 +263,7 @@ detach(#{connection_id := CId, connection_mod := Mod, connection_state := CState
_ ->
ok
end,
State#{connection_id := undefined}.
State#{connection_id := undefined, connection_pid := undefined, connection_ref := undefined}.

-spec socket_exit(state()) -> state().
socket_exit(State) ->
Expand All @@ -240,28 +274,23 @@ heartbeat(Span) ->
erlang:send_after(timer:seconds(Span), self(), {?FUNCTION_NAME, Span}),
ok.

-spec lookup(
connection_module(),
proxy_transport(),
peer(),
connection_id(),
connection_options()
) -> {ok, pid()} | {error, Reason :: term()}.
lookup(Mod, Transport, Peer, CId, Opts) ->
case esockd_udp_proxy_db:lookup(Mod, CId) of
{ok, _} = Ok ->
Ok;
undefined ->
case esockd_udp_proxy_connection:create(Mod, Transport, Peer, Opts) of
{ok, Pid} ->
esockd_udp_proxy_db:insert(Mod, CId, Pid),
_ = erlang:monitor(process, Pid),
{ok, Pid};
ignore ->
{error, ignore};
Error ->
Error
end
-spec lookup(connection_id(), state()) -> {ok, pid()} | {error, Reason :: term()}.
lookup(_CId, #{connection_pid := Pid}) when is_pid(Pid) ->
{ok, Pid};
lookup(CId, #{
connection_pid := undefined,
connection_mod := Mod,
transport := Transport,
peer := Peer,
connection_options := Opts
}) ->
case esockd_udp_proxy_connection:find_or_create(Mod, CId, Transport, Peer, Opts) of
{ok, Pid} ->
{ok, Pid};
ignore ->
{error, ignore};
Error ->
Error
end.

-spec send(proxy_transport(), peer(), binary()) -> _.
Expand Down
9 changes: 5 additions & 4 deletions src/udp_proxy/esockd_udp_proxy_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

-export([
initialize/2,
create/4,
find_or_create/5,
get_connection_id/5,
dispatch/4,
close/3
Expand All @@ -34,7 +34,8 @@
-callback initialize(connection_options()) -> connection_state().

%% Create new connection
-callback create(proxy_transport(), peer(), connection_options()) -> gen_server:start_ret().
-callback find_or_create(connection_id(), proxy_transport(), peer(), connection_options()) ->
gen_server:start_ret().

%% Find routing information
-callback get_connection_id(
Expand All @@ -54,8 +55,8 @@
initialize(Mod, Opts) ->
Mod:initialize(Opts).

create(Mod, Transport, Peer, Opts) ->
Mod:create(Transport, Peer, Opts).
find_or_create(Mod, CId, Transport, Peer, Opts) ->
Mod:find_or_create(CId, Transport, Peer, Opts).

get_connection_id(Mod, Transport, Peer, State, Data) ->
Mod:get_connection_id(Transport, Peer, State, Data).
Expand Down
58 changes: 19 additions & 39 deletions src/udp_proxy/esockd_udp_proxy_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@
%% API
-export([
start_link/0,
insert/3,
attach/2,
detach/2,
lookup/2
detach/2
]).

%% gen_server callbacks
Expand All @@ -42,14 +40,11 @@

-record(connection, {
id :: ?ID(connection_module(), connection_id()),
%% the connection pid
pid :: pid(),
%% Reference Counter
count :: non_neg_integer()
proxy :: pid()
}).

-define(TAB, esockd_udp_proxy_db).
-define(MINIMUM_VAL, -2147483647).

%%--------------------------------------------------------------------
%%- API
Expand All @@ -58,42 +53,27 @@
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

-spec insert(connection_module(), connection_id(), pid()) -> boolean().
insert(Mod, CId, Pid) ->
ets:insert_new(?TAB, #connection{
id = ?ID(Mod, CId),
pid = Pid,
count = 0
}).

-spec attach(connection_module(), connection_id()) -> integer().
attach(Mod, CId) ->
ets:update_counter(?TAB, ?ID(Mod, CId), {#connection.count, 1}).

-spec detach(connection_module(), connection_id()) -> {Clear :: true, connection_state()} | false.
ID = ?ID(Mod, CId),
case ets:lookup(?TAB, ID) of
[] ->
ok;
[#connection{proxy = ProxyId}] ->
esockd_udp_proxy:takeover(ProxyId, CId)
end,
ets:insert(?TAB, #connection{id = ID, proxy = self()}).

-spec detach(connection_module(), connection_id()) -> boolean().
detach(Mod, CId) ->
Id = ?ID(Mod, CId),
RC = ets:update_counter(?TAB, Id, {#connection.count, -1, 0, ?MINIMUM_VAL}),
if
RC < 0 ->
case ets:lookup(?TAB, Id) of
[#connection{pid = Pid}] ->
ets:delete(?TAB, Id),
{true, Pid};
_ ->
false
end;
true ->
false
end.

-spec lookup(connection_module(), connection_id()) -> {ok, pid()} | undefined.
lookup(Mod, CId) ->
case ets:lookup(?TAB, ?ID(Mod, CId)) of
[#connection{pid = Pid}] ->
{ok, Pid};
ProxyId = self(),
ID = ?ID(Mod, CId),
case ets:lookup(?TAB, ID) of
[#connection{proxy = ProxyId}] ->
ets:delete(?TAB, ID),
true;
_ ->
undefined
false
end.

%%--------------------------------------------------------------------
Expand Down

0 comments on commit 2da91fb

Please sign in to comment.