Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify client handler #98

Merged
merged 9 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions examples/source_with_standalone_server.exs
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,14 @@ parent_process_pid = self()

handle_new_client = fn client_ref, app, stream_key ->
send(parent_process_pid, {:client_ref, client_ref, app, stream_key})
Membrane.RTMP.Source.ClientHandlerImpl
end

# Run the standalone server
{:ok, server} =
Membrane.RTMPServer.start_link(
handler: %Membrane.RTMP.Source.ClientHandlerImpl{controlling_process: self()},
port: port,
use_ssl?: false,
handle_new_client: handle_new_client,
client_timeout: 5_000
handle_new_client: handle_new_client
)

app = "app"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,16 @@ defmodule Membrane.RTMP.Source.ClientHandlerImpl do

@behaviour Membrane.RTMPServer.ClientHandler

defstruct [:controlling_process]
defstruct []

@impl true
def handle_init(opts) do
def handle_init(_opts) do
%{
source_pid: nil,
buffered: [],
app: nil,
stream_key: nil,
controlling_process: opts.controlling_process
buffered: []
}
end

@impl true
def handle_connected(connected_msg, state) do
%{state | app: connected_msg.app}
end

@impl true
def handle_stream_published(publish_msg, state) do
%{state | stream_key: publish_msg.stream_key}
end

@impl true
def handle_info({:send_me_data, source_pid}, state) do
buffers_to_send = Enum.reverse(state.buffered)
Expand Down
4 changes: 2 additions & 2 deletions lib/membrane_rtmp_plugin/rtmp/source/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ defmodule Membrane.RTMP.Source do

handle_new_client = fn client_ref, app, stream_key ->
send(parent_pid, {:client_ref, client_ref, app, stream_key})
__MODULE__.ClientHandlerImpl
end

{:ok, server_pid} =
Membrane.RTMPServer.start_link(
handler: %__MODULE__.ClientHandlerImpl{controlling_process: self()},
port: port,
use_ssl?: use_ssl?,
handle_new_client: handle_new_client,
client_timeout: 100
client_timeout: Membrane.Time.milliseconds(100)
)

state = %{state | app: app, stream_key: stream_key, server: server_pid}
Expand Down
52 changes: 27 additions & 25 deletions lib/membrane_rtmp_plugin/rtmp_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ defmodule Membrane.RTMPServer do
If no data is demanded within the client_timeout period, TCP socket is closed.

Options:
- client_timeout: Time (ms) after which an unused client connection is automatically closed.
- handle_new_client: An anonymous function called when a new client connects.
- handle_new_client: An anonymous function called when a new client connects.
It receives the client reference, `app` and `stream_key`, allowing custom processing,
like sending the reference to another process. If it's not provided, default implementation is used:
{:client_ref, client_ref, app, stream_key} message is sent to the process that invoked RTMPServer.start_link().
like sending the reference to another process. The function should return a `t:#{inspect(__MODULE__)}.client_behaviour_spec/0`
which defines how the client should behave.
- port: Port on which RTMP server will listen. Defaults to 1935.
- use_ssl?: If true, SSL socket (for RTMPS) will be used. Othwerwise, TCP socket (for RTMP) will be used. Defaults to false.
- client_timeout: Time after which an unused client connection is automatically closed. Defaults to 5 seconds.
- name: If not nil, value of this field will be used as a name under which the server's process will be registered. Defaults to nil.
"""
use GenServer

Expand All @@ -21,17 +24,29 @@ defmodule Membrane.RTMPServer do
Defines options for the RTMP server.
"""
@type t :: [
handler: ClientHandler.t(),
port: :inet.port_number(),
use_ssl?: boolean(),
name: atom() | nil,
handle_new_client:
(client_ref :: pid(), app :: String.t(), stream_key :: String.t() ->
any())
| nil,
handle_new_client: (client_ref :: pid(), app :: String.t(), stream_key :: String.t() ->
client_behaviour_spec()),
client_timeout: Membrane.Time.t()
]

@default_options %{
port: 1935,
use_ssl?: false,
name: nil,
client_timeout: Membrane.Time.seconds(5)
}

@typedoc """
A type representing how a client handler should behave.
If just a tuple is passed, the second element of that tuple is used as
an input argument of the `c:#{inspect(ClientHandler)}.handle_init/1`. Otherwise, an empty
map is passed to the `c:#{inspect(ClientHandler)}.handle_init/1`.
"""
@type client_behaviour_spec :: ClientHandler.t() | {ClientHandler.t(), opts :: any()}

@type server_identifier :: pid() | atom()

@doc """
Expand All @@ -40,23 +55,10 @@ defmodule Membrane.RTMPServer do
@spec start_link(server_options :: t()) :: GenServer.on_start()
def start_link(server_options) do
gen_server_opts = if server_options[:name] == nil, do: [], else: [name: server_options[:name]]
server_options_map = Enum.into(server_options, %{})
server_options_map = Map.merge(@default_options, server_options_map)

server_options = Enum.into(server_options, %{})

server_options =
if server_options[:handle_new_client] == nil do
parent_process_pid = self()

callback = fn client_ref, app, stream_key ->
send(parent_process_pid, {:client_ref, client_ref, app, stream_key})
end

Map.put(server_options, :handle_new_client, callback)
else
server_options
end

GenServer.start_link(__MODULE__, server_options, gen_server_opts)
GenServer.start_link(__MODULE__, server_options_map, gen_server_opts)
end

@doc """
Expand Down
71 changes: 32 additions & 39 deletions lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,51 +12,40 @@ defmodule Membrane.RTMPServer.ClientHandler do
require Logger
alias Membrane.RTMP.{Handshake, MessageHandler, MessageParser}

@typedoc """
A type representing a module which implements `#{inspect(__MODULE__)}` behaviour.
"""
@type t :: module()

@typedoc """
Type representing the user defined state of the client handler.
"""
@type t :: term()
@type state :: any()

@doc """
The callback invoked once the client handler is created.
It should return the initial state of the client handler.
"""
@callback handle_init(any()) :: t()

@doc """
The callback invoked when the client sends the `Membrane.RTMP.Messages.Connect.t()`
message.
"""
@callback handle_connected(connected_msg :: Membrane.RTMP.Messages.Connect.t(), state :: t()) ::
t()

@doc """
The callback invoked when the client sends the `Membrane.RTMP.Messages.Publish.t()`
message.
"""
@callback handle_stream_published(
publish_msg :: Membrane.RTMP.Messages.Publish.t(),
state :: t()
) :: t()
@callback handle_init(any()) :: state()

@doc """
The callback invoked when new piece of data is received from a given client.
"""
@callback handle_data_available(payload :: binary(), state :: t()) :: t()
@callback handle_data_available(payload :: binary(), state :: state()) :: state()

@doc """
The callback invoked when the client served by given client handler
stops sending data.
(for instance, when the remote client deletes the stream or
terminates the socket connection)
"""
@callback handle_end_of_stream(state :: t()) :: t()
@callback handle_end_of_stream(state :: state()) :: state()

@doc """
The callback invoked when the client handler receives a message
that is not recognized as an internal message of the client handler.
"""
@callback handle_info(msg :: term(), t()) :: t()
@callback handle_info(msg :: term(), state()) :: state()

@doc """
Makes the client handler ask client for the desired number of buffers
Expand All @@ -73,16 +62,14 @@ defmodule Membrane.RTMPServer.ClientHandler do
message_parser_state = Handshake.init_server() |> MessageParser.init()
message_handler_state = MessageHandler.init(%{socket: opts.socket, use_ssl?: opts.use_ssl?})

%handler_module{} = opts.handler

{:ok,
%{
socket: opts.socket,
use_ssl?: opts.use_ssl?,
message_parser_state: message_parser_state,
message_handler_state: message_handler_state,
handler: handler_module,
handler_state: handler_module.handle_init(opts.handler),
handler: nil,
handler_state: nil,
app: nil,
stream_key: nil,
server: opts.server,
Expand Down Expand Up @@ -163,15 +150,28 @@ defmodule Membrane.RTMPServer.ClientHandler do
%{publish_msg: %Membrane.RTMP.Messages.Publish{stream_key: stream_key}} =
message_handler_state

if is_function(state.handle_new_client) do
state.handle_new_client.(self(), state.app, stream_key)
else
if not is_function(state.handle_new_client) do
raise "handle_new_client is not a function"
end

Process.send_after(self(), {:client_timeout, state.app, stream_key}, state.client_timeout)
{handler_module, opts} =
case state.handle_new_client.(self(), state.app, stream_key) do
{handler_module, opts} -> {handler_module, opts}
handler_module -> {handler_module, %{}}
end

%{state | notified_about_client?: true}
Process.send_after(
self(),
{:client_timeout, state.app, stream_key},
Membrane.Time.as_milliseconds(state.client_timeout, :round)
)

%{
state
| notified_about_client?: true,
handler: handler_module,
handler_state: handler_module.handle_init(opts)
}
else
state
end
Expand Down Expand Up @@ -210,19 +210,12 @@ defmodule Membrane.RTMPServer.ClientHandler do
}

{:connected, connected_msg} ->
new_handler_state =
state.handler.handle_connected(connected_msg, state.handler_state)

%{state | handler_state: new_handler_state, app: connected_msg.app}
%{state | app: connected_msg.app}

{:published, publish_msg} ->
new_handler_state =
state.handler.handle_stream_published(publish_msg, state.handler_state)

%{
state
| handler_state: new_handler_state,
stream_key: publish_msg.stream_key,
| stream_key: publish_msg.stream_key,
published?: true
}
end
Expand Down
1 change: 0 additions & 1 deletion lib/membrane_rtmp_plugin/rtmp_server/listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ defmodule Membrane.RTMPServer.Listener do
GenServer.start_link(ClientHandler,
socket: client,
use_ssl?: options.use_ssl?,
handler: options.handler,
server: options.server,
handle_new_client: options.handle_new_client,
client_timeout: options.client_timeout
Expand Down
6 changes: 2 additions & 4 deletions test/membrane_rtmp_plugin/rtmp_source_bin_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -222,17 +222,15 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do

handle_new_client = fn client_ref, app, stream_key ->
send(parent_process_pid, {:client_ref, client_ref, app, stream_key})
Membrane.RTMP.Source.ClientHandlerImpl
end

{:ok, server_pid} =
Membrane.RTMPServer.start_link(
handler: %Membrane.RTMP.Source.ClientHandlerImpl{
controlling_process: self()
},
port: port,
use_ssl?: use_ssl?,
handle_new_client: handle_new_client,
client_timeout: 3_000
client_timeout: Membrane.Time.seconds(3)
)

{:ok, assigned_port} = Membrane.RTMPServer.get_port(server_pid)
Expand Down