From 94f9c34d9ed5aeb2db7125587ce32022fecef18b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Wed, 18 Sep 2024 11:31:52 +0200 Subject: [PATCH 1/9] Remove handle_connected and handle_published callbacks from the client handler behaviour. Make handle_new_client return a struct that defines the behaviour of the newly connected client. Rename cliet_handler_for_source.ex into client_handler_impl.ex to match the module name. Update the tests and examples accordingly --- examples/source_with_standalone_server.exs | 2 +- ...r_for_source.ex => client_handler_impl.ex} | 19 ++----- .../rtmp/source/source.ex | 2 +- .../rtmp_server/client_handler.ex | 52 ++++++------------- .../rtmp_server/listener.ex | 1 - .../rtmp_source_bin_test.exs | 4 +- 6 files changed, 23 insertions(+), 57 deletions(-) rename lib/membrane_rtmp_plugin/rtmp/source/{client_handler_for_source.ex => client_handler_impl.ex} (74%) diff --git a/examples/source_with_standalone_server.exs b/examples/source_with_standalone_server.exs index 1f22a78..66566b3 100644 --- a/examples/source_with_standalone_server.exs +++ b/examples/source_with_standalone_server.exs @@ -53,12 +53,12 @@ parent_process_pid = self() handle_new_client = fn client_ref, app, stream_key -> send(parent_process_pid, {:client_ref, client_ref, app, stream_key}) + %Membrane.RTMP.Source.ClientHandlerImpl{} end # Run the standalone server {:ok, server} = Membrane.RTMPServer.start_link( - handler: %Membrane.RTMP.Source.ClientHandlerImpl{controlling_process: self()}, port: port, use_ssl?: false, handle_new_client: handle_new_client, diff --git a/lib/membrane_rtmp_plugin/rtmp/source/client_handler_for_source.ex b/lib/membrane_rtmp_plugin/rtmp/source/client_handler_impl.ex similarity index 74% rename from lib/membrane_rtmp_plugin/rtmp/source/client_handler_for_source.ex rename to lib/membrane_rtmp_plugin/rtmp/source/client_handler_impl.ex index 6039e11..ad5164b 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/client_handler_for_source.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/client_handler_impl.ex @@ -6,29 +6,16 @@ defmodule Membrane.RTMP.Source.ClientHandlerImpl do @behaviour Membrane.RTMPServer.ClientHandler - defstruct [:controlling_process] + defstruct [] @impl true - def handle_init(opts) do + def handle_init(_opts) do %{ source_pid: nil, - buffered: [], - app: nil, - stream_key: nil, - controlling_process: opts.controlling_process + buffered: [] } end - @impl true - def handle_connected(connected_msg, state) do - %{state | app: connected_msg.app} - end - - @impl true - def handle_stream_published(publish_msg, state) do - %{state | stream_key: publish_msg.stream_key} - end - @impl true def handle_info({:send_me_data, source_pid}, state) do buffers_to_send = Enum.reverse(state.buffered) diff --git a/lib/membrane_rtmp_plugin/rtmp/source/source.ex b/lib/membrane_rtmp_plugin/rtmp/source/source.ex index 41881fd..f1c063f 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/source.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/source.ex @@ -86,11 +86,11 @@ defmodule Membrane.RTMP.Source do handle_new_client = fn client_ref, app, stream_key -> send(parent_pid, {:client_ref, client_ref, app, stream_key}) + %__MODULE__.ClientHandlerImpl{} end {:ok, server_pid} = Membrane.RTMPServer.start_link( - handler: %__MODULE__.ClientHandlerImpl{controlling_process: self()}, port: port, use_ssl?: use_ssl?, handle_new_client: handle_new_client, diff --git a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex index 5a08a66..f173b42 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex @@ -23,22 +23,6 @@ defmodule Membrane.RTMPServer.ClientHandler do """ @callback handle_init(any()) :: t() - @doc """ - The callback invoked when the client sends the `Membrane.RTMP.Messages.Connect.t()` - message. - """ - @callback handle_connected(connected_msg :: Membrane.RTMP.Messages.Connect.t(), state :: t()) :: - t() - - @doc """ - The callback invoked when the client sends the `Membrane.RTMP.Messages.Publish.t()` - message. - """ - @callback handle_stream_published( - publish_msg :: Membrane.RTMP.Messages.Publish.t(), - state :: t() - ) :: t() - @doc """ The callback invoked when new piece of data is received from a given client. """ @@ -73,16 +57,14 @@ defmodule Membrane.RTMPServer.ClientHandler do message_parser_state = Handshake.init_server() |> MessageParser.init() message_handler_state = MessageHandler.init(%{socket: opts.socket, use_ssl?: opts.use_ssl?}) - %handler_module{} = opts.handler - {:ok, %{ socket: opts.socket, use_ssl?: opts.use_ssl?, message_parser_state: message_parser_state, message_handler_state: message_handler_state, - handler: handler_module, - handler_state: handler_module.handle_init(opts.handler), + handler: nil, + handler_state: nil, app: nil, stream_key: nil, server: opts.server, @@ -163,15 +145,22 @@ defmodule Membrane.RTMPServer.ClientHandler do %{publish_msg: %Membrane.RTMP.Messages.Publish{stream_key: stream_key}} = message_handler_state - if is_function(state.handle_new_client) do - state.handle_new_client.(self(), state.app, stream_key) - else - raise "handle_new_client is not a function" - end + handler = + if is_function(state.handle_new_client) do + state.handle_new_client.(self(), state.app, stream_key) + else + raise "handle_new_client is not a function" + end + %handler_module{} = handler Process.send_after(self(), {:client_timeout, state.app, stream_key}, state.client_timeout) - %{state | notified_about_client?: true} + %{ + state + | notified_about_client?: true, + handler: handler_module, + handler_state: handler_module.handle_init(handler) + } else state end @@ -210,19 +199,12 @@ defmodule Membrane.RTMPServer.ClientHandler do } {:connected, connected_msg} -> - new_handler_state = - state.handler.handle_connected(connected_msg, state.handler_state) - - %{state | handler_state: new_handler_state, app: connected_msg.app} + %{state | app: connected_msg.app} {:published, publish_msg} -> - new_handler_state = - state.handler.handle_stream_published(publish_msg, state.handler_state) - %{ state - | handler_state: new_handler_state, - stream_key: publish_msg.stream_key, + | stream_key: publish_msg.stream_key, published?: true } end diff --git a/lib/membrane_rtmp_plugin/rtmp_server/listener.ex b/lib/membrane_rtmp_plugin/rtmp_server/listener.ex index 6cf2865..a24312e 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server/listener.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server/listener.ex @@ -51,7 +51,6 @@ defmodule Membrane.RTMPServer.Listener do GenServer.start_link(ClientHandler, socket: client, use_ssl?: options.use_ssl?, - handler: options.handler, server: options.server, handle_new_client: options.handle_new_client, client_timeout: options.client_timeout diff --git a/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs b/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs index 9d5d489..9b9bd31 100644 --- a/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs +++ b/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs @@ -222,13 +222,11 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do handle_new_client = fn client_ref, app, stream_key -> send(parent_process_pid, {:client_ref, client_ref, app, stream_key}) + %Membrane.RTMP.Source.ClientHandlerImpl{} end {:ok, server_pid} = Membrane.RTMPServer.start_link( - handler: %Membrane.RTMP.Source.ClientHandlerImpl{ - controlling_process: self() - }, port: port, use_ssl?: use_ssl?, handle_new_client: handle_new_client, From 1c71f6c9367387b2aa45f060754b1687e3d820bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Wed, 18 Sep 2024 11:35:35 +0200 Subject: [PATCH 2/9] Move common RTMP functionalities modules from rtmp/source/ to the rtmp/ directory --- lib/membrane_rtmp_plugin/rtmp/{source => }/amf0/encoder.ex | 0 lib/membrane_rtmp_plugin/rtmp/{source => }/amf0/parser.ex | 0 lib/membrane_rtmp_plugin/rtmp/{source => }/amf3/parser.ex | 0 lib/membrane_rtmp_plugin/rtmp/{source => }/handshake.ex | 0 lib/membrane_rtmp_plugin/rtmp/{source => }/handshake/step.ex | 0 lib/membrane_rtmp_plugin/rtmp/{source => }/header.ex | 0 lib/membrane_rtmp_plugin/rtmp/{source => }/message.ex | 0 lib/membrane_rtmp_plugin/rtmp/{source => }/message_handler.ex | 0 lib/membrane_rtmp_plugin/rtmp/{source => }/message_parser.ex | 0 .../rtmp/{source => }/messages/acknowledgement.ex | 0 .../rtmp/{source => }/messages/additional_media.ex | 0 lib/membrane_rtmp_plugin/rtmp/{source => }/messages/anonymous.ex | 0 lib/membrane_rtmp_plugin/rtmp/{source => }/messages/audio.ex | 0 .../rtmp/{source => }/messages/command/connect.ex | 0 .../rtmp/{source => }/messages/command/create_stream.ex | 0 .../rtmp/{source => }/messages/command/delete_stream.ex | 0 .../rtmp/{source => }/messages/command/fc_publish.ex | 0 .../rtmp/{source => }/messages/command/publish.ex | 0 .../rtmp/{source => }/messages/command/release_stream.ex | 0 .../rtmp/{source => }/messages/on_expect_additional_media.ex | 0 .../rtmp/{source => }/messages/on_meta_data.ex | 0 lib/membrane_rtmp_plugin/rtmp/{source => }/messages/serializer.ex | 0 .../rtmp/{source => }/messages/set_chunk_size.ex | 0 .../rtmp/{source => }/messages/set_data_frame.ex | 0 .../rtmp/{source => }/messages/set_peer_bandwidth.ex | 0 .../rtmp/{source => }/messages/user_control.ex | 0 lib/membrane_rtmp_plugin/rtmp/{source => }/messages/video.ex | 0 .../rtmp/{source => }/messages/window_acknowledgement.ex | 0 lib/membrane_rtmp_plugin/rtmp/{source => }/responses.ex | 0 29 files changed, 0 insertions(+), 0 deletions(-) rename lib/membrane_rtmp_plugin/rtmp/{source => }/amf0/encoder.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/amf0/parser.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/amf3/parser.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/handshake.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/handshake/step.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/header.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/message.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/message_handler.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/message_parser.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/messages/acknowledgement.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/messages/additional_media.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/messages/anonymous.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/messages/audio.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/messages/command/connect.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/messages/command/create_stream.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/messages/command/delete_stream.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/messages/command/fc_publish.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/messages/command/publish.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/messages/command/release_stream.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/messages/on_expect_additional_media.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/messages/on_meta_data.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/messages/serializer.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/messages/set_chunk_size.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/messages/set_data_frame.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/messages/set_peer_bandwidth.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/messages/user_control.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/messages/video.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/messages/window_acknowledgement.ex (100%) rename lib/membrane_rtmp_plugin/rtmp/{source => }/responses.ex (100%) diff --git a/lib/membrane_rtmp_plugin/rtmp/source/amf0/encoder.ex b/lib/membrane_rtmp_plugin/rtmp/amf0/encoder.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/amf0/encoder.ex rename to lib/membrane_rtmp_plugin/rtmp/amf0/encoder.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/amf0/parser.ex b/lib/membrane_rtmp_plugin/rtmp/amf0/parser.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/amf0/parser.ex rename to lib/membrane_rtmp_plugin/rtmp/amf0/parser.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/amf3/parser.ex b/lib/membrane_rtmp_plugin/rtmp/amf3/parser.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/amf3/parser.ex rename to lib/membrane_rtmp_plugin/rtmp/amf3/parser.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/handshake.ex b/lib/membrane_rtmp_plugin/rtmp/handshake.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/handshake.ex rename to lib/membrane_rtmp_plugin/rtmp/handshake.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/handshake/step.ex b/lib/membrane_rtmp_plugin/rtmp/handshake/step.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/handshake/step.ex rename to lib/membrane_rtmp_plugin/rtmp/handshake/step.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/header.ex b/lib/membrane_rtmp_plugin/rtmp/header.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/header.ex rename to lib/membrane_rtmp_plugin/rtmp/header.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/message.ex b/lib/membrane_rtmp_plugin/rtmp/message.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/message.ex rename to lib/membrane_rtmp_plugin/rtmp/message.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex b/lib/membrane_rtmp_plugin/rtmp/message_handler.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex rename to lib/membrane_rtmp_plugin/rtmp/message_handler.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/message_parser.ex b/lib/membrane_rtmp_plugin/rtmp/message_parser.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/message_parser.ex rename to lib/membrane_rtmp_plugin/rtmp/message_parser.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/acknowledgement.ex b/lib/membrane_rtmp_plugin/rtmp/messages/acknowledgement.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/acknowledgement.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/acknowledgement.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/additional_media.ex b/lib/membrane_rtmp_plugin/rtmp/messages/additional_media.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/additional_media.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/additional_media.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/anonymous.ex b/lib/membrane_rtmp_plugin/rtmp/messages/anonymous.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/anonymous.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/anonymous.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/audio.ex b/lib/membrane_rtmp_plugin/rtmp/messages/audio.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/audio.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/audio.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/command/connect.ex b/lib/membrane_rtmp_plugin/rtmp/messages/command/connect.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/command/connect.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/command/connect.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/command/create_stream.ex b/lib/membrane_rtmp_plugin/rtmp/messages/command/create_stream.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/command/create_stream.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/command/create_stream.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/command/delete_stream.ex b/lib/membrane_rtmp_plugin/rtmp/messages/command/delete_stream.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/command/delete_stream.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/command/delete_stream.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/command/fc_publish.ex b/lib/membrane_rtmp_plugin/rtmp/messages/command/fc_publish.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/command/fc_publish.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/command/fc_publish.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/command/publish.ex b/lib/membrane_rtmp_plugin/rtmp/messages/command/publish.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/command/publish.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/command/publish.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/command/release_stream.ex b/lib/membrane_rtmp_plugin/rtmp/messages/command/release_stream.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/command/release_stream.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/command/release_stream.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/on_expect_additional_media.ex b/lib/membrane_rtmp_plugin/rtmp/messages/on_expect_additional_media.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/on_expect_additional_media.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/on_expect_additional_media.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/on_meta_data.ex b/lib/membrane_rtmp_plugin/rtmp/messages/on_meta_data.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/on_meta_data.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/on_meta_data.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/serializer.ex b/lib/membrane_rtmp_plugin/rtmp/messages/serializer.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/serializer.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/serializer.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/set_chunk_size.ex b/lib/membrane_rtmp_plugin/rtmp/messages/set_chunk_size.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/set_chunk_size.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/set_chunk_size.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/set_data_frame.ex b/lib/membrane_rtmp_plugin/rtmp/messages/set_data_frame.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/set_data_frame.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/set_data_frame.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/set_peer_bandwidth.ex b/lib/membrane_rtmp_plugin/rtmp/messages/set_peer_bandwidth.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/set_peer_bandwidth.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/set_peer_bandwidth.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/user_control.ex b/lib/membrane_rtmp_plugin/rtmp/messages/user_control.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/user_control.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/user_control.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/video.ex b/lib/membrane_rtmp_plugin/rtmp/messages/video.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/video.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/video.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/window_acknowledgement.ex b/lib/membrane_rtmp_plugin/rtmp/messages/window_acknowledgement.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/messages/window_acknowledgement.ex rename to lib/membrane_rtmp_plugin/rtmp/messages/window_acknowledgement.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/responses.ex b/lib/membrane_rtmp_plugin/rtmp/responses.ex similarity index 100% rename from lib/membrane_rtmp_plugin/rtmp/source/responses.ex rename to lib/membrane_rtmp_plugin/rtmp/responses.ex From a93ba84c88decf8e3215077b885b994c4e62fccf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Wed, 18 Sep 2024 12:32:21 +0200 Subject: [PATCH 3/9] Improve documentation --- lib/membrane_rtmp_plugin/rtmp_server.ex | 21 +++---------------- .../rtmp_server/client_handler.ex | 15 ++++++++----- 2 files changed, 13 insertions(+), 23 deletions(-) diff --git a/lib/membrane_rtmp_plugin/rtmp_server.ex b/lib/membrane_rtmp_plugin/rtmp_server.ex index 4e76dfe..4830cd7 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server.ex @@ -8,8 +8,8 @@ defmodule Membrane.RTMPServer do - client_timeout: Time (ms) after which an unused client connection is automatically closed. - handle_new_client: An anonymous function called when a new client connects. It receives the client reference, `app` and `stream_key`, allowing custom processing, - like sending the reference to another process. If it's not provided, default implementation is used: - {:client_ref, client_ref, app, stream_key} message is sent to the process that invoked RTMPServer.start_link(). + like sending the reference to another process. The function should return a `` struct + which defines how the client should behave. """ use GenServer @@ -21,14 +21,12 @@ defmodule Membrane.RTMPServer do Defines options for the RTMP server. """ @type t :: [ - handler: ClientHandler.t(), port: :inet.port_number(), use_ssl?: boolean(), name: atom() | nil, handle_new_client: (client_ref :: pid(), app :: String.t(), stream_key :: String.t() -> - any()) - | nil, + ClientHandler.t()), client_timeout: Membrane.Time.t() ] @@ -43,19 +41,6 @@ defmodule Membrane.RTMPServer do server_options = Enum.into(server_options, %{}) - server_options = - if server_options[:handle_new_client] == nil do - parent_process_pid = self() - - callback = fn client_ref, app, stream_key -> - send(parent_process_pid, {:client_ref, client_ref, app, stream_key}) - end - - Map.put(server_options, :handle_new_client, callback) - else - server_options - end - GenServer.start_link(__MODULE__, server_options, gen_server_opts) end diff --git a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex index f173b42..85791eb 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex @@ -12,21 +12,26 @@ defmodule Membrane.RTMPServer.ClientHandler do require Logger alias Membrane.RTMP.{Handshake, MessageHandler, MessageParser} + @typedoc """ + A type representing a struct which module implements `#{inspect(__MODULE__)}` behaviour. + """ + @type t :: struct() + @typedoc """ Type representing the user defined state of the client handler. """ - @type t :: term() + @type state :: any() @doc """ The callback invoked once the client handler is created. It should return the initial state of the client handler. """ - @callback handle_init(any()) :: t() + @callback handle_init(any()) :: state() @doc """ The callback invoked when new piece of data is received from a given client. """ - @callback handle_data_available(payload :: binary(), state :: t()) :: t() + @callback handle_data_available(payload :: binary(), state :: state()) :: state() @doc """ The callback invoked when the client served by given client handler @@ -34,13 +39,13 @@ defmodule Membrane.RTMPServer.ClientHandler do (for instance, when the remote client deletes the stream or terminates the socket connection) """ - @callback handle_end_of_stream(state :: t()) :: t() + @callback handle_end_of_stream(state :: state()) :: state() @doc """ The callback invoked when the client handler receives a message that is not recognized as an internal message of the client handler. """ - @callback handle_info(msg :: term(), t()) :: t() + @callback handle_info(msg :: term(), state()) :: state() @doc """ Makes the client handler ask client for the desired number of buffers From 4eae253231d9ca0708071731b159f09afe41818d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Wed, 18 Sep 2024 13:03:30 +0200 Subject: [PATCH 4/9] Use a childspec-like approach to define the behaviour of a client handler returned from the handle_new_client --- examples/source_with_standalone_server.exs | 2 +- .../rtmp/source/source.ex | 2 +- lib/membrane_rtmp_plugin/rtmp_server.ex | 12 +++++++++-- .../rtmp_server/client_handler.ex | 20 ++++++++++--------- .../rtmp_source_bin_test.exs | 2 +- 5 files changed, 24 insertions(+), 14 deletions(-) diff --git a/examples/source_with_standalone_server.exs b/examples/source_with_standalone_server.exs index 66566b3..2bfdde2 100644 --- a/examples/source_with_standalone_server.exs +++ b/examples/source_with_standalone_server.exs @@ -53,7 +53,7 @@ parent_process_pid = self() handle_new_client = fn client_ref, app, stream_key -> send(parent_process_pid, {:client_ref, client_ref, app, stream_key}) - %Membrane.RTMP.Source.ClientHandlerImpl{} + Membrane.RTMP.Source.ClientHandlerImpl end # Run the standalone server diff --git a/lib/membrane_rtmp_plugin/rtmp/source/source.ex b/lib/membrane_rtmp_plugin/rtmp/source/source.ex index f1c063f..0aa5755 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/source.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/source.ex @@ -86,7 +86,7 @@ defmodule Membrane.RTMP.Source do handle_new_client = fn client_ref, app, stream_key -> send(parent_pid, {:client_ref, client_ref, app, stream_key}) - %__MODULE__.ClientHandlerImpl{} + __MODULE__.ClientHandlerImpl end {:ok, server_pid} = diff --git a/lib/membrane_rtmp_plugin/rtmp_server.ex b/lib/membrane_rtmp_plugin/rtmp_server.ex index 4830cd7..e791c11 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server.ex @@ -8,7 +8,7 @@ defmodule Membrane.RTMPServer do - client_timeout: Time (ms) after which an unused client connection is automatically closed. - handle_new_client: An anonymous function called when a new client connects. It receives the client reference, `app` and `stream_key`, allowing custom processing, - like sending the reference to another process. The function should return a `` struct + like sending the reference to another process. The function should return a `t:#{inspect(__MODULE__)}.client_behaviour_spec/0` which defines how the client should behave. """ use GenServer @@ -26,10 +26,18 @@ defmodule Membrane.RTMPServer do name: atom() | nil, handle_new_client: (client_ref :: pid(), app :: String.t(), stream_key :: String.t() -> - ClientHandler.t()), + client_behaviour_spec()), client_timeout: Membrane.Time.t() ] + @typedoc """ + A type representing how a client handler should behave. + If just a tuple is passed, the second element of that tuple is used as + an input argument of the `c:#{inspect(ClientHandler)}.handle_init/1`. Otherwise, an empty + map is passed to the `c:#{inspect(ClientHandler)}.handle_init/1`. + """ + @type client_behaviour_spec :: ClientHandler.t() | {ClientHandler.t(), opts :: any()} + @type server_identifier :: pid() | atom() @doc """ diff --git a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex index 85791eb..d3e6a11 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex @@ -13,9 +13,9 @@ defmodule Membrane.RTMPServer.ClientHandler do alias Membrane.RTMP.{Handshake, MessageHandler, MessageParser} @typedoc """ - A type representing a struct which module implements `#{inspect(__MODULE__)}` behaviour. + A type representing a module which implements `#{inspect(__MODULE__)}` behaviour. """ - @type t :: struct() + @type t :: module() @typedoc """ Type representing the user defined state of the client handler. @@ -150,21 +150,23 @@ defmodule Membrane.RTMPServer.ClientHandler do %{publish_msg: %Membrane.RTMP.Messages.Publish{stream_key: stream_key}} = message_handler_state - handler = - if is_function(state.handle_new_client) do - state.handle_new_client.(self(), state.app, stream_key) - else - raise "handle_new_client is not a function" + if not is_function(state.handle_new_client) do + raise "handle_new_client is not a function" + end + + {handler_module, opts} = + case state.handle_new_client.(self(), state.app, stream_key) do + {handler_module, opts} -> {handler_module, opts} + handler_module -> {handler_module, %{}} end - %handler_module{} = handler Process.send_after(self(), {:client_timeout, state.app, stream_key}, state.client_timeout) %{ state | notified_about_client?: true, handler: handler_module, - handler_state: handler_module.handle_init(handler) + handler_state: handler_module.handle_init(opts) } else state diff --git a/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs b/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs index 9b9bd31..91f04b2 100644 --- a/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs +++ b/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs @@ -222,7 +222,7 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do handle_new_client = fn client_ref, app, stream_key -> send(parent_process_pid, {:client_ref, client_ref, app, stream_key}) - %Membrane.RTMP.Source.ClientHandlerImpl{} + Membrane.RTMP.Source.ClientHandlerImpl end {:ok, server_pid} = From 88d6618fc0b11c40aaeef4da01c93b5669d9496b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Wed, 18 Sep 2024 13:08:54 +0200 Subject: [PATCH 5/9] Fix formatting --- lib/membrane_rtmp_plugin/rtmp_server.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/membrane_rtmp_plugin/rtmp_server.ex b/lib/membrane_rtmp_plugin/rtmp_server.ex index e791c11..8585ea9 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server.ex @@ -26,7 +26,7 @@ defmodule Membrane.RTMPServer do name: atom() | nil, handle_new_client: (client_ref :: pid(), app :: String.t(), stream_key :: String.t() -> - client_behaviour_spec()), + client_behaviour_spec()), client_timeout: Membrane.Time.t() ] From 7f7a1c0dfc862d782afe7ab30b81a806ade02f4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Wed, 18 Sep 2024 14:18:35 +0200 Subject: [PATCH 6/9] Fix formatting --- lib/membrane_rtmp_plugin/rtmp_server.ex | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/membrane_rtmp_plugin/rtmp_server.ex b/lib/membrane_rtmp_plugin/rtmp_server.ex index 8585ea9..b7f078b 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server.ex @@ -24,9 +24,8 @@ defmodule Membrane.RTMPServer do port: :inet.port_number(), use_ssl?: boolean(), name: atom() | nil, - handle_new_client: - (client_ref :: pid(), app :: String.t(), stream_key :: String.t() -> - client_behaviour_spec()), + handle_new_client: (client_ref :: pid(), app :: String.t(), stream_key :: String.t() -> + client_behaviour_spec()), client_timeout: Membrane.Time.t() ] From dfdc3e1d39c54aa5f29321dae1e6dcc4803cfa0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Thu, 19 Sep 2024 14:34:28 +0200 Subject: [PATCH 7/9] Add defaults to RTMP server options --- examples/source_with_standalone_server.exs | 4 +--- .../rtmp/source/source.ex | 2 +- lib/membrane_rtmp_plugin/rtmp_server.ex | 23 ++++++++++++++----- .../rtmp_server/client_handler.ex | 6 ++++- .../rtmp_source_bin_test.exs | 2 +- 5 files changed, 25 insertions(+), 12 deletions(-) diff --git a/examples/source_with_standalone_server.exs b/examples/source_with_standalone_server.exs index 2bfdde2..3e1129a 100644 --- a/examples/source_with_standalone_server.exs +++ b/examples/source_with_standalone_server.exs @@ -60,9 +60,7 @@ end {:ok, server} = Membrane.RTMPServer.start_link( port: port, - use_ssl?: false, - handle_new_client: handle_new_client, - client_timeout: 5_000 + handle_new_client: handle_new_client ) app = "app" diff --git a/lib/membrane_rtmp_plugin/rtmp/source/source.ex b/lib/membrane_rtmp_plugin/rtmp/source/source.ex index 0aa5755..c265137 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/source.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/source.ex @@ -94,7 +94,7 @@ defmodule Membrane.RTMP.Source do port: port, use_ssl?: use_ssl?, handle_new_client: handle_new_client, - client_timeout: 100 + client_timeout: Membrane.Time.milliseconds(100) ) state = %{state | app: app, stream_key: stream_key, server: server_pid} diff --git a/lib/membrane_rtmp_plugin/rtmp_server.ex b/lib/membrane_rtmp_plugin/rtmp_server.ex index b7f078b..128446d 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server.ex @@ -5,11 +5,14 @@ defmodule Membrane.RTMPServer do If no data is demanded within the client_timeout period, TCP socket is closed. Options: - - client_timeout: Time (ms) after which an unused client connection is automatically closed. - handle_new_client: An anonymous function called when a new client connects. It receives the client reference, `app` and `stream_key`, allowing custom processing, like sending the reference to another process. The function should return a `t:#{inspect(__MODULE__)}.client_behaviour_spec/0` which defines how the client should behave. + - port: Port on which RTMP server will listen. Defaults to 1935. + - use_ssl?: If true, SSL socket (for RTMPS) will be used. Othwerwise, TCP socket (for RTMP) will be used. Defaults to false. + - client_timeout: Time after which an unused client connection is automatically closed. Defaults to 5 seconds. + - name: If not nil, value of this field will be used as a name under which the server's process will be registered. Defaults to nil. """ use GenServer @@ -24,11 +27,19 @@ defmodule Membrane.RTMPServer do port: :inet.port_number(), use_ssl?: boolean(), name: atom() | nil, - handle_new_client: (client_ref :: pid(), app :: String.t(), stream_key :: String.t() -> - client_behaviour_spec()), + handle_new_client: + (client_ref :: pid(), app :: String.t(), stream_key :: String.t() -> + client_behaviour_spec()), client_timeout: Membrane.Time.t() ] + @default_options %{ + port: 1935, + use_ssl?: false, + name: nil, + client_timeout: Membrane.Time.seconds(5) + } + @typedoc """ A type representing how a client handler should behave. If just a tuple is passed, the second element of that tuple is used as @@ -45,10 +56,10 @@ defmodule Membrane.RTMPServer do @spec start_link(server_options :: t()) :: GenServer.on_start() def start_link(server_options) do gen_server_opts = if server_options[:name] == nil, do: [], else: [name: server_options[:name]] + server_options_map = Enum.into(server_options, %{}) + server_options_map = Map.merge(@default_options, server_options_map) - server_options = Enum.into(server_options, %{}) - - GenServer.start_link(__MODULE__, server_options, gen_server_opts) + GenServer.start_link(__MODULE__, server_options_map, gen_server_opts) end @doc """ diff --git a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex index d3e6a11..c7a2847 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex @@ -160,7 +160,11 @@ defmodule Membrane.RTMPServer.ClientHandler do handler_module -> {handler_module, %{}} end - Process.send_after(self(), {:client_timeout, state.app, stream_key}, state.client_timeout) + Process.send_after( + self(), + {:client_timeout, state.app, stream_key}, + Membrane.Time.as_milliseconds(state.client_timeout, :round) + ) %{ state diff --git a/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs b/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs index 91f04b2..5729b23 100644 --- a/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs +++ b/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs @@ -230,7 +230,7 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do port: port, use_ssl?: use_ssl?, handle_new_client: handle_new_client, - client_timeout: 3_000 + client_timeout: Membrane.Time.seconds(3) ) {:ok, assigned_port} = Membrane.RTMPServer.get_port(server_pid) From 3cb846de9c17f8f8a334b761794c7b2125be6a50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Thu, 19 Sep 2024 14:36:19 +0200 Subject: [PATCH 8/9] Fix formatting --- lib/membrane_rtmp_plugin/rtmp_server.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/membrane_rtmp_plugin/rtmp_server.ex b/lib/membrane_rtmp_plugin/rtmp_server.ex index 128446d..19ce194 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server.ex @@ -5,7 +5,7 @@ defmodule Membrane.RTMPServer do If no data is demanded within the client_timeout period, TCP socket is closed. Options: - - handle_new_client: An anonymous function called when a new client connects. + - handle_new_client: An anonymous function called when a new client connects. It receives the client reference, `app` and `stream_key`, allowing custom processing, like sending the reference to another process. The function should return a `t:#{inspect(__MODULE__)}.client_behaviour_spec/0` which defines how the client should behave. From 4e398c1c8038861464285a043c47b8129992dfe8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Thu, 19 Sep 2024 14:38:21 +0200 Subject: [PATCH 9/9] Fix formatting --- lib/membrane_rtmp_plugin/rtmp_server.ex | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/membrane_rtmp_plugin/rtmp_server.ex b/lib/membrane_rtmp_plugin/rtmp_server.ex index 19ce194..6e87dba 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server.ex @@ -27,9 +27,8 @@ defmodule Membrane.RTMPServer do port: :inet.port_number(), use_ssl?: boolean(), name: atom() | nil, - handle_new_client: - (client_ref :: pid(), app :: String.t(), stream_key :: String.t() -> - client_behaviour_spec()), + handle_new_client: (client_ref :: pid(), app :: String.t(), stream_key :: String.t() -> + client_behaviour_spec()), client_timeout: Membrane.Time.t() ]