Skip to content

Commit

Permalink
Simplify client handler (#98)
Browse files Browse the repository at this point in the history
* Remove handle_connected and handle_published callbacks from the client handler behaviour. Make handle_new_client return a struct that defines the behaviour of the newly connected client. Rename cliet_handler_for_source.ex into client_handler_impl.ex to match the module name. Update the tests and examples accordingly

* Move common RTMP functionalities modules from rtmp/source/ to the rtmp/ directory

* Improve documentation

* Use a childspec-like approach to define the behaviour of a client handler returned from the handle_new_client

* Add defaults to RTMP server options
  • Loading branch information
varsill authored Sep 19, 2024
1 parent 3258ba2 commit 088b06e
Show file tree
Hide file tree
Showing 36 changed files with 68 additions and 91 deletions.
6 changes: 2 additions & 4 deletions examples/source_with_standalone_server.exs
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,14 @@ parent_process_pid = self()

handle_new_client = fn client_ref, app, stream_key ->
send(parent_process_pid, {:client_ref, client_ref, app, stream_key})
Membrane.RTMP.Source.ClientHandlerImpl
end

# Run the standalone server
{:ok, server} =
Membrane.RTMPServer.start_link(
handler: %Membrane.RTMP.Source.ClientHandlerImpl{controlling_process: self()},
port: port,
use_ssl?: false,
handle_new_client: handle_new_client,
client_timeout: 5_000
handle_new_client: handle_new_client
)

app = "app"
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,16 @@ defmodule Membrane.RTMP.Source.ClientHandlerImpl do

@behaviour Membrane.RTMPServer.ClientHandler

defstruct [:controlling_process]
defstruct []

@impl true
def handle_init(opts) do
def handle_init(_opts) do
%{
source_pid: nil,
buffered: [],
app: nil,
stream_key: nil,
controlling_process: opts.controlling_process
buffered: []
}
end

@impl true
def handle_connected(connected_msg, state) do
%{state | app: connected_msg.app}
end

@impl true
def handle_stream_published(publish_msg, state) do
%{state | stream_key: publish_msg.stream_key}
end

@impl true
def handle_info({:send_me_data, source_pid}, state) do
buffers_to_send = Enum.reverse(state.buffered)
Expand Down
4 changes: 2 additions & 2 deletions lib/membrane_rtmp_plugin/rtmp/source/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ defmodule Membrane.RTMP.Source do

handle_new_client = fn client_ref, app, stream_key ->
send(parent_pid, {:client_ref, client_ref, app, stream_key})
__MODULE__.ClientHandlerImpl
end

{:ok, server_pid} =
Membrane.RTMPServer.start_link(
handler: %__MODULE__.ClientHandlerImpl{controlling_process: self()},
port: port,
use_ssl?: use_ssl?,
handle_new_client: handle_new_client,
client_timeout: 100
client_timeout: Membrane.Time.milliseconds(100)
)

state = %{state | app: app, stream_key: stream_key, server: server_pid}
Expand Down
52 changes: 27 additions & 25 deletions lib/membrane_rtmp_plugin/rtmp_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ defmodule Membrane.RTMPServer do
If no data is demanded within the client_timeout period, TCP socket is closed.
Options:
- client_timeout: Time (ms) after which an unused client connection is automatically closed.
- handle_new_client: An anonymous function called when a new client connects.
- handle_new_client: An anonymous function called when a new client connects.
It receives the client reference, `app` and `stream_key`, allowing custom processing,
like sending the reference to another process. If it's not provided, default implementation is used:
{:client_ref, client_ref, app, stream_key} message is sent to the process that invoked RTMPServer.start_link().
like sending the reference to another process. The function should return a `t:#{inspect(__MODULE__)}.client_behaviour_spec/0`
which defines how the client should behave.
- port: Port on which RTMP server will listen. Defaults to 1935.
- use_ssl?: If true, SSL socket (for RTMPS) will be used. Othwerwise, TCP socket (for RTMP) will be used. Defaults to false.
- client_timeout: Time after which an unused client connection is automatically closed. Defaults to 5 seconds.
- name: If not nil, value of this field will be used as a name under which the server's process will be registered. Defaults to nil.
"""
use GenServer

Expand All @@ -21,17 +24,29 @@ defmodule Membrane.RTMPServer do
Defines options for the RTMP server.
"""
@type t :: [
handler: ClientHandler.t(),
port: :inet.port_number(),
use_ssl?: boolean(),
name: atom() | nil,
handle_new_client:
(client_ref :: pid(), app :: String.t(), stream_key :: String.t() ->
any())
| nil,
handle_new_client: (client_ref :: pid(), app :: String.t(), stream_key :: String.t() ->
client_behaviour_spec()),
client_timeout: Membrane.Time.t()
]

@default_options %{
port: 1935,
use_ssl?: false,
name: nil,
client_timeout: Membrane.Time.seconds(5)
}

@typedoc """
A type representing how a client handler should behave.
If just a tuple is passed, the second element of that tuple is used as
an input argument of the `c:#{inspect(ClientHandler)}.handle_init/1`. Otherwise, an empty
map is passed to the `c:#{inspect(ClientHandler)}.handle_init/1`.
"""
@type client_behaviour_spec :: ClientHandler.t() | {ClientHandler.t(), opts :: any()}

@type server_identifier :: pid() | atom()

@doc """
Expand All @@ -40,23 +55,10 @@ defmodule Membrane.RTMPServer do
@spec start_link(server_options :: t()) :: GenServer.on_start()
def start_link(server_options) do
gen_server_opts = if server_options[:name] == nil, do: [], else: [name: server_options[:name]]
server_options_map = Enum.into(server_options, %{})
server_options_map = Map.merge(@default_options, server_options_map)

server_options = Enum.into(server_options, %{})

server_options =
if server_options[:handle_new_client] == nil do
parent_process_pid = self()

callback = fn client_ref, app, stream_key ->
send(parent_process_pid, {:client_ref, client_ref, app, stream_key})
end

Map.put(server_options, :handle_new_client, callback)
else
server_options
end

GenServer.start_link(__MODULE__, server_options, gen_server_opts)
GenServer.start_link(__MODULE__, server_options_map, gen_server_opts)
end

@doc """
Expand Down
71 changes: 32 additions & 39 deletions lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,51 +12,40 @@ defmodule Membrane.RTMPServer.ClientHandler do
require Logger
alias Membrane.RTMP.{Handshake, MessageHandler, MessageParser}

@typedoc """
A type representing a module which implements `#{inspect(__MODULE__)}` behaviour.
"""
@type t :: module()

@typedoc """
Type representing the user defined state of the client handler.
"""
@type t :: term()
@type state :: any()

@doc """
The callback invoked once the client handler is created.
It should return the initial state of the client handler.
"""
@callback handle_init(any()) :: t()

@doc """
The callback invoked when the client sends the `Membrane.RTMP.Messages.Connect.t()`
message.
"""
@callback handle_connected(connected_msg :: Membrane.RTMP.Messages.Connect.t(), state :: t()) ::
t()

@doc """
The callback invoked when the client sends the `Membrane.RTMP.Messages.Publish.t()`
message.
"""
@callback handle_stream_published(
publish_msg :: Membrane.RTMP.Messages.Publish.t(),
state :: t()
) :: t()
@callback handle_init(any()) :: state()

@doc """
The callback invoked when new piece of data is received from a given client.
"""
@callback handle_data_available(payload :: binary(), state :: t()) :: t()
@callback handle_data_available(payload :: binary(), state :: state()) :: state()

@doc """
The callback invoked when the client served by given client handler
stops sending data.
(for instance, when the remote client deletes the stream or
terminates the socket connection)
"""
@callback handle_end_of_stream(state :: t()) :: t()
@callback handle_end_of_stream(state :: state()) :: state()

@doc """
The callback invoked when the client handler receives a message
that is not recognized as an internal message of the client handler.
"""
@callback handle_info(msg :: term(), t()) :: t()
@callback handle_info(msg :: term(), state()) :: state()

@doc """
Makes the client handler ask client for the desired number of buffers
Expand All @@ -73,16 +62,14 @@ defmodule Membrane.RTMPServer.ClientHandler do
message_parser_state = Handshake.init_server() |> MessageParser.init()
message_handler_state = MessageHandler.init(%{socket: opts.socket, use_ssl?: opts.use_ssl?})

%handler_module{} = opts.handler

{:ok,
%{
socket: opts.socket,
use_ssl?: opts.use_ssl?,
message_parser_state: message_parser_state,
message_handler_state: message_handler_state,
handler: handler_module,
handler_state: handler_module.handle_init(opts.handler),
handler: nil,
handler_state: nil,
app: nil,
stream_key: nil,
server: opts.server,
Expand Down Expand Up @@ -163,15 +150,28 @@ defmodule Membrane.RTMPServer.ClientHandler do
%{publish_msg: %Membrane.RTMP.Messages.Publish{stream_key: stream_key}} =
message_handler_state

if is_function(state.handle_new_client) do
state.handle_new_client.(self(), state.app, stream_key)
else
if not is_function(state.handle_new_client) do
raise "handle_new_client is not a function"
end

Process.send_after(self(), {:client_timeout, state.app, stream_key}, state.client_timeout)
{handler_module, opts} =
case state.handle_new_client.(self(), state.app, stream_key) do
{handler_module, opts} -> {handler_module, opts}
handler_module -> {handler_module, %{}}
end

%{state | notified_about_client?: true}
Process.send_after(
self(),
{:client_timeout, state.app, stream_key},
Membrane.Time.as_milliseconds(state.client_timeout, :round)
)

%{
state
| notified_about_client?: true,
handler: handler_module,
handler_state: handler_module.handle_init(opts)
}
else
state
end
Expand Down Expand Up @@ -210,19 +210,12 @@ defmodule Membrane.RTMPServer.ClientHandler do
}

{:connected, connected_msg} ->
new_handler_state =
state.handler.handle_connected(connected_msg, state.handler_state)

%{state | handler_state: new_handler_state, app: connected_msg.app}
%{state | app: connected_msg.app}

{:published, publish_msg} ->
new_handler_state =
state.handler.handle_stream_published(publish_msg, state.handler_state)

%{
state
| handler_state: new_handler_state,
stream_key: publish_msg.stream_key,
| stream_key: publish_msg.stream_key,
published?: true
}
end
Expand Down
1 change: 0 additions & 1 deletion lib/membrane_rtmp_plugin/rtmp_server/listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ defmodule Membrane.RTMPServer.Listener do
GenServer.start_link(ClientHandler,
socket: client,
use_ssl?: options.use_ssl?,
handler: options.handler,
server: options.server,
handle_new_client: options.handle_new_client,
client_timeout: options.client_timeout
Expand Down
6 changes: 2 additions & 4 deletions test/membrane_rtmp_plugin/rtmp_source_bin_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -222,17 +222,15 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do

handle_new_client = fn client_ref, app, stream_key ->
send(parent_process_pid, {:client_ref, client_ref, app, stream_key})
Membrane.RTMP.Source.ClientHandlerImpl
end

{:ok, server_pid} =
Membrane.RTMPServer.start_link(
handler: %Membrane.RTMP.Source.ClientHandlerImpl{
controlling_process: self()
},
port: port,
use_ssl?: use_ssl?,
handle_new_client: handle_new_client,
client_timeout: 3_000
client_timeout: Membrane.Time.seconds(3)
)

{:ok, assigned_port} = Membrane.RTMPServer.get_port(server_pid)
Expand Down

0 comments on commit 088b06e

Please sign in to comment.