diff --git a/examples.livemd b/examples.livemd index de7e7a4..7f57541 100644 --- a/examples.livemd +++ b/examples.livemd @@ -18,7 +18,7 @@ Mix.install([{:boombox, path: __DIR__}, :kino]) :inets.start(:httpd, bind_address: ~c"localhost", port: 1234, - document_root: ~c"#{__DIR__}/examples_assets/", + document_root: ~c"#{__DIR__}/examples_assets", server_name: ~c"assets_server", server_root: "/tmp", erl_script_nocache: true @@ -77,6 +77,16 @@ Boombox.run(input: {:webrtc, "ws://localhost:8829"}, output: {:webrtc, "ws://loc +## MP4 to HLS + +To receive the stream, visit http://localhost:1234/hls/stream.html after running the cell below + +```elixir +Boombox.run(input: bbb_mp4, output: {:hls, "examples_assets/hls/hls_output"}) +``` + + + ## RTMP to MP4 ```elixir diff --git a/examples_assets/hls/hls_output/.gitignore b/examples_assets/hls/hls_output/.gitignore new file mode 100644 index 0000000..c3d07e5 --- /dev/null +++ b/examples_assets/hls/hls_output/.gitignore @@ -0,0 +1,3 @@ +* +!.gitkeep +!.gitignore diff --git a/examples_assets/hls/hls_output/.gitkeep b/examples_assets/hls/hls_output/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/examples_assets/hls/stream.html b/examples_assets/hls/stream.html new file mode 100644 index 0000000..5406bd9 --- /dev/null +++ b/examples_assets/hls/stream.html @@ -0,0 +1,17 @@ + + + + + +

Membrane Boombox HLS Example

+ + + + + + diff --git a/lib/boombox.ex b/lib/boombox.ex index 7df81df..a2fb0be 100644 --- a/lib/boombox.ex +++ b/lib/boombox.ex @@ -20,6 +20,7 @@ defmodule Boombox do | Path.t() | {:file, file_extension(), Path.t()} | {:webrtc, webrtc_opts()} + | {:hls, Path.t()} @spec run(input: input, output: output) :: :ok def run(opts) do diff --git a/lib/boombox/hls.ex b/lib/boombox/hls.ex new file mode 100644 index 0000000..43065cb --- /dev/null +++ b/lib/boombox/hls.ex @@ -0,0 +1,57 @@ +defmodule Boombox.HLS do + @moduledoc false + + import Membrane.ChildrenSpec + + require Membrane.Pad, as: Pad + alias Boombox.Pipeline.Ready + alias Membrane.Time + + @spec link_output( + Path.t(), + Boombox.Pipeline.track_builders(), + Membrane.ChildrenSpec.t() + ) :: Ready.t() + def link_output(location, track_builders, spec_builder) do + {directory, manifest_name} = + if Path.extname(location) == ".m3u8" do + {Path.dirname(location), Path.basename(location, ".m3u8")} + else + {location, "index"} + end + + spec = + [ + spec_builder, + child( + :hls_sink_bin, + %Membrane.HTTPAdaptiveStream.SinkBin{ + manifest_name: manifest_name, + manifest_module: Membrane.HTTPAdaptiveStream.HLS, + storage: %Membrane.HTTPAdaptiveStream.Storages.FileStorage{ + directory: directory + }, + hls_mode: :muxed_av + } + ), + Enum.map(track_builders, fn + {:audio, builder} -> + builder + |> child(:hls_out_aac_encoder, Membrane.AAC.FDK.Encoder) + |> via_in(Pad.ref(:input, :audio), + options: [encoding: :AAC, segment_duration: Time.milliseconds(2000)] + ) + |> get_child(:hls_sink_bin) + + {:video, builder} -> + builder + |> via_in(Pad.ref(:input, :video), + options: [encoding: :H264, segment_duration: Time.milliseconds(2000)] + ) + |> get_child(:hls_sink_bin) + end) + ] + + %Ready{actions: [spec: spec]} + end +end diff --git a/lib/boombox/pipeline.ex b/lib/boombox/pipeline.ex index 3f2e98d..ae21e8c 100644 --- a/lib/boombox/pipeline.ex +++ b/lib/boombox/pipeline.ex @@ -24,7 +24,7 @@ defmodule Boombox.Pipeline do require Membrane.Logger - @supported_file_extensions %{".mp4" => :mp4} + @supported_file_extensions %{".mp4" => :mp4, ".m3u8" => :m3u8} @type track_builders :: %{ optional(:audio) => Membrane.ChildrenSpec.t(), @@ -149,6 +149,11 @@ defmodule Boombox.Pipeline do end end + @impl true + def handle_child_notification(:end_of_stream, :hls_sink_bin, _ctx, state) do + {[terminate: :normal], state} + end + @impl true def handle_child_notification(notification, child, _ctx, state) do Membrane.Logger.debug_verbose( @@ -293,6 +298,10 @@ defmodule Boombox.Pipeline do Boombox.MP4.link_output(location, track_builders, spec_builder) end + defp link_output({:hls, location}, track_builders, spec_builder, _ctx) do + Boombox.HLS.link_output(location, track_builders, spec_builder) + end + defp parse_input(input) when is_binary(input) do uri = URI.new!(input) @@ -320,7 +329,10 @@ defmodule Boombox.Pipeline do case uri do %URI{scheme: nil, path: path} when path != nil -> - {:file, parse_file_extension(path), path} + case parse_file_extension(path) do + :m3u8 -> {:hls, path} + file_type -> {:file, file_type, path} + end _other -> raise "Unsupported URI: #{output}" @@ -331,7 +343,7 @@ defmodule Boombox.Pipeline do output end - @spec parse_file_extension(Path.t()) :: Boombox.file_extension() + @spec parse_file_extension(Path.t()) :: Boombox.file_extension() | :m3u8 defp parse_file_extension(path) do extension = Path.extname(path) diff --git a/mix.exs b/mix.exs index 11e98a7..2bf017d 100644 --- a/mix.exs +++ b/mix.exs @@ -52,8 +52,10 @@ defmodule Boombox.Mixfile do {:membrane_aac_fdk_plugin, "~> 0.18.0"}, {:membrane_h26x_plugin, "~> 0.10.0"}, {:membrane_h264_ffmpeg_plugin, "~> 0.32.0"}, - {:membrane_mp4_plugin, github: "membraneframework/membrane_mp4_plugin", branch: "wip-avc3"}, + {:membrane_mp4_plugin, + github: "membraneframework/membrane_mp4_plugin", branch: "wip-avc3", override: true}, {:membrane_realtimer_plugin, "~> 0.9.0"}, + {:membrane_http_adaptive_stream_plugin, "~> 0.18.0"}, {:membrane_rtmp_plugin, github: "membraneframework/membrane_rtmp_plugin"}, {:membrane_ffmpeg_swresample_plugin, "~> 0.20.0"}, {:membrane_hackney_plugin, "~> 0.11.0"}, diff --git a/mix.lock b/mix.lock index 4a802ea..c908c33 100644 --- a/mix.lock +++ b/mix.lock @@ -52,6 +52,7 @@ "membrane_h265_format": {:hex, :membrane_h265_format, "0.2.0", "1903c072cf7b0980c4d0c117ab61a2cd33e88782b696290de29570a7fab34819", [:mix], [], "hexpm", "6df418bdf242c0d9f7dbf2e5aea4c2d182e34ac9ad5a8b8cef2610c290002e83"}, "membrane_h26x_plugin": {:hex, :membrane_h26x_plugin, "0.10.2", "caf2790d8c107df35f8d456b45f4e09fb9c56ce6c7669a3a03f7d59972e6ed82", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}], "hexpm", "becf1ac4a589adecd850137ccd61a33058f686083a514a7e39fcd721bcf9fb2e"}, "membrane_hackney_plugin": {:hex, :membrane_hackney_plugin, "0.11.0", "54b368333a23394e7cac2f4d6b701bf8c5ee6614670a31f4ebe009b5e691a5c1", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:mockery, "~> 2.3", [hex: :mockery, repo: "hexpm", optional: false]}], "hexpm", "2b28fd1be3c889d5824d7d985598386c7673828c88f49a91221df3626af8a998"}, + "membrane_http_adaptive_stream_plugin": {:hex, :membrane_http_adaptive_stream_plugin, "0.18.4", "f938907eb0e39db2acf8c84e0b770c1976b2793bd58c2e37eca0334dc019f11d", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_plugin, "~> 0.18.0", [hex: :membrane_aac_plugin, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h26x_plugin, "~> 0.10.0", [hex: :membrane_h26x_plugin, repo: "hexpm", optional: false]}, {:membrane_mp4_plugin, "~> 0.34.1", [hex: :membrane_mp4_plugin, repo: "hexpm", optional: false]}, {:membrane_tee_plugin, "~> 0.12.0", [hex: :membrane_tee_plugin, repo: "hexpm", optional: false]}, {:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}], "hexpm", "3f9174faf1f734f1b8507aeea0525384869797e51b88f1b2e33931bae9de32fc"}, "membrane_mp4_format": {:hex, :membrane_mp4_format, "0.8.0", "8c6e7d68829228117d333b4fbb030e7be829aab49dd8cb047fdc664db1812e6a", [:mix], [], "hexpm", "148dea678a1f82ccfd44dbde6f936d2f21255f496cb45a22cc6eec427f025522"}, "membrane_mp4_plugin": {:git, "https://github.com/membraneframework/membrane_mp4_plugin.git", "3786a3e834cc2ce541c3c7830590b39da87e12a1", [branch: "wip-avc3"]}, "membrane_opus_format": {:hex, :membrane_opus_format, "0.3.0", "3804d9916058b7cfa2baa0131a644d8186198d64f52d592ae09e0942513cb4c2", [:mix], [], "hexpm", "8fc89c97be50de23ded15f2050fe603dcce732566fe6fdd15a2de01cb6b81afe"}, @@ -66,6 +67,7 @@ "membrane_rtp_opus_plugin": {:hex, :membrane_rtp_opus_plugin, "0.9.0", "ae76421faa04697a4af76a55b6c5e675dea61b611d29d8201098783d42863af7", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}], "hexpm", "58f095d2978daf999d87c1c016007cb7d99434208486331ab5045e77f5be9dcc"}, "membrane_rtp_plugin": {:hex, :membrane_rtp_plugin, "0.29.0", "0277310eb599b8e6de9e0b864807f23b3b245865e39a28f0cbab695d1f2c157e", [:mix], [{:bimap, "~> 1.2", [hex: :bimap, repo: "hexpm", optional: false]}, {:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:ex_libsrtp, "~> 0.6.0 or ~> 0.7.0", [hex: :ex_libsrtp, repo: "hexpm", optional: true]}, {:heap, "~> 2.0.2", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_funnel_plugin, "~> 0.9.0", [hex: :membrane_funnel_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}, {:membrane_telemetry_metrics, "~> 0.1.0", [hex: :membrane_telemetry_metrics, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.1", [hex: :qex, repo: "hexpm", optional: false]}], "hexpm", "1b3fd808114e06332b6a4e000238998a9188d1ef625c414ca3239aee70f0775d"}, "membrane_rtp_vp8_plugin": {:hex, :membrane_rtp_vp8_plugin, "0.9.1", "9e8a74d764730a23382ba862a238963c9639b4c6963238caeb6fe2449a66add8", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}, {:membrane_vp8_format, "~> 0.4.0", [hex: :membrane_vp8_format, repo: "hexpm", optional: false]}], "hexpm", "704856eb2734bb6ea5cc47242c241de45debb5724a81cffb344bacda9867fe98"}, + "membrane_tee_plugin": {:hex, :membrane_tee_plugin, "0.12.0", "f94989b4080ef4b7937d74c1a14d3379577c7bd4c6d06e5a2bb41c351ad604d4", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "0d61c9ed5e68e5a75d54200e1c6df5739c0bcb52fee0974183ad72446a179887"}, "membrane_telemetry_metrics": {:hex, :membrane_telemetry_metrics, "0.1.0", "cb93d28356b436b0597736c3e4153738d82d2a14ff547f831df7e9051e54fc06", [:mix], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6.1", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "aba28dc8311f70ced95d984509be930fac55857d2d18bffcf768815e627be3f0"}, "membrane_vp8_format": {:hex, :membrane_vp8_format, "0.4.0", "6c29ec67479edfbab27b11266dc92f18f3baf4421262c5c31af348c33e5b92c7", [:mix], [], "hexpm", "8bb005ede61db8fcb3535a883f32168b251c2dfd1109197c8c3b39ce28ed08e2"}, "membrane_webrtc_plugin": {:hex, :membrane_webrtc_plugin, "0.21.0", "0d47a6ffe3eb18abf43e9f6d089a409120ecd5cff43095d065fbb9e1c038f79c", [:mix], [{:bandit, "~> 1.2", [hex: :bandit, repo: "hexpm", optional: false]}, {:ex_webrtc, "~> 0.3.0", [hex: :ex_webrtc, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_rtp_h264_plugin, "~> 0.19.0", [hex: :membrane_rtp_h264_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_opus_plugin, "~> 0.9.0", [hex: :membrane_rtp_opus_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_plugin, "~> 0.29.0", [hex: :membrane_rtp_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_vp8_plugin, "~> 0.9.1", [hex: :membrane_rtp_vp8_plugin, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.0", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "39d383eadb1b1ce10975ac8505012e901c8961e6f5a65577ff0fbf03b7bc8fc7"}, diff --git a/test/boombox_test.exs b/test/boombox_test.exs index b73229e..6bb8234 100644 --- a/test/boombox_test.exs +++ b/test/boombox_test.exs @@ -22,21 +22,21 @@ defmodule BoomboxTest do async_test "mp4 file -> mp4 file", %{tmp_dir: tmp} do output = Path.join(tmp, "output.mp4") Boombox.run(input: @bbb_mp4, output: output) - Compare.compare("#{tmp}/output.mp4", "test/fixtures/ref_bun10s_aac.mp4") + Compare.compare(output, "test/fixtures/ref_bun10s_aac.mp4") end @tag :file_file_mp4_audio async_test "mp4 file -> mp4 file audio", %{tmp_dir: tmp} do output = Path.join(tmp, "output.mp4") Boombox.run(input: @bbb_mp4_a, output: output) - Compare.compare(output, "test/fixtures/ref_bun10s_aac.mp4", :audio) + Compare.compare(output, "test/fixtures/ref_bun10s_aac.mp4", kinds: [:audio]) end @tag :file_file_mp4_video async_test "mp4 file -> mp4 file video", %{tmp_dir: tmp} do output = Path.join(tmp, "output.mp4") Boombox.run(input: @bbb_mp4_v, output: output) - Compare.compare(output, "test/fixtures/ref_bun10s_aac.mp4", :video) + Compare.compare(output, "test/fixtures/ref_bun10s_aac.mp4", kinds: [:video]) end @tag :http_file_mp4 @@ -76,7 +76,7 @@ defmodule BoomboxTest do Boombox.run(input: {:webrtc, signaling}, output: "#{tmp}/output.mp4") Task.await(t) - Compare.compare(output, "test/fixtures/ref_bun10s_opus_aac.mp4", :audio) + Compare.compare(output, "test/fixtures/ref_bun10s_opus_aac.mp4", kinds: [:audio]) end @tag :webrtc_video @@ -89,7 +89,7 @@ defmodule BoomboxTest do Boombox.run(input: {:webrtc, signaling}, output: output) Task.await(t) - Compare.compare(output, "test/fixtures/ref_bun10s_opus_aac.mp4", :video) + Compare.compare(output, "test/fixtures/ref_bun10s_opus_aac.mp4", kinds: [:video]) end @tag :webrtc2 @@ -147,6 +147,45 @@ defmodule BoomboxTest do Compare.compare(output, "test/fixtures/ref_bun10s_opus_aac.mp4") end + @tag :file_hls + async_test "mp4 file -> hls", %{tmp_dir: tmp} do + manifest_filename = Path.join(tmp, "index.m3u8") + Boombox.run(input: @bbb_mp4, output: manifest_filename) + ref_path = "test/fixtures/ref_bun10s_aac_hls" + Compare.compare(tmp, ref_path, format: :hls) + + Enum.zip( + Path.join(tmp, "*.mp4") |> Path.wildcard(), + Path.join(ref_path, "*.mp4") |> Path.wildcard() + ) + |> Enum.each(fn {output_file, ref_file} -> + assert File.read!(output_file) == File.read!(ref_file) + end) + end + + @tag :rtmp_hls + async_test "rtmp -> hls", %{tmp_dir: tmp} do + manifest_filename = Path.join(tmp, "index.m3u8") + url = "rtmp://localhost:5003/app/stream_key" + ref_path = "test/fixtures/ref_bun10s_aac_hls" + t = Task.async(fn -> Boombox.run(input: url, output: manifest_filename) end) + + # Wait for boombox to be ready + Process.sleep(200) + p = send_rtmp(url) + Task.await(t, 30_000) + Testing.Pipeline.terminate(p) + Compare.compare(tmp, ref_path, format: :hls) + + Enum.zip( + Path.join(tmp, "*.mp4") |> Path.wildcard(), + Path.join(ref_path, "*.mp4") |> Path.wildcard() + ) + |> Enum.each(fn {output_file, ref_file} -> + assert File.read!(output_file) == File.read!(ref_file) + end) + end + defp send_rtmp(url) do p = Testing.Pipeline.start_link_supervised!( diff --git a/test/fixtures/ref_bun10s_aac_hls/g3cFdmlkZW8.m3u8 b/test/fixtures/ref_bun10s_aac_hls/g3cFdmlkZW8.m3u8 new file mode 100644 index 0000000..6a56d7a --- /dev/null +++ b/test/fixtures/ref_bun10s_aac_hls/g3cFdmlkZW8.m3u8 @@ -0,0 +1,13 @@ +#EXTM3U +#EXT-X-VERSION:7 +#EXT-X-TARGETDURATION:5 +#EXT-X-MEDIA-SEQUENCE:0 +#EXT-X-DISCONTINUITY-SEQUENCE:0 +#EXT-X-MAP:URI="muxed_header_g3cFdmlkZW8_part_0.mp4" +#EXTINF:4.80365593, +muxed_segment_0_g3cFdmlkZW8.m4s +#EXTINF:4.288194112, +muxed_segment_1_g3cFdmlkZW8.m4s +#EXTINF:0.924473961, +muxed_segment_2_g3cFdmlkZW8.m4s +#EXT-X-ENDLIST diff --git a/test/fixtures/ref_bun10s_aac_hls/index.m3u8 b/test/fixtures/ref_bun10s_aac_hls/index.m3u8 new file mode 100644 index 0000000..6d0cd5f --- /dev/null +++ b/test/fixtures/ref_bun10s_aac_hls/index.m3u8 @@ -0,0 +1,5 @@ +#EXTM3U +#EXT-X-VERSION:7 +#EXT-X-INDEPENDENT-SEGMENTS +#EXT-X-STREAM-INF:BANDWIDTH=1176608,AVERAGE-BANDWIDTH=969642,RESOLUTION=480x270,CODECS="avc1.42e015,mp4a.40.2" +g3cFdmlkZW8.m3u8 \ No newline at end of file diff --git a/test/fixtures/ref_bun10s_aac_hls/muxed_header_g3cFdmlkZW8_part_0.mp4 b/test/fixtures/ref_bun10s_aac_hls/muxed_header_g3cFdmlkZW8_part_0.mp4 new file mode 100644 index 0000000..8858a8c Binary files /dev/null and b/test/fixtures/ref_bun10s_aac_hls/muxed_header_g3cFdmlkZW8_part_0.mp4 differ diff --git a/test/fixtures/ref_bun10s_aac_hls/muxed_segment_0_g3cFdmlkZW8.m4s b/test/fixtures/ref_bun10s_aac_hls/muxed_segment_0_g3cFdmlkZW8.m4s new file mode 100644 index 0000000..559de66 Binary files /dev/null and b/test/fixtures/ref_bun10s_aac_hls/muxed_segment_0_g3cFdmlkZW8.m4s differ diff --git a/test/fixtures/ref_bun10s_aac_hls/muxed_segment_1_g3cFdmlkZW8.m4s b/test/fixtures/ref_bun10s_aac_hls/muxed_segment_1_g3cFdmlkZW8.m4s new file mode 100644 index 0000000..e8f838d Binary files /dev/null and b/test/fixtures/ref_bun10s_aac_hls/muxed_segment_1_g3cFdmlkZW8.m4s differ diff --git a/test/fixtures/ref_bun10s_aac_hls/muxed_segment_2_g3cFdmlkZW8.m4s b/test/fixtures/ref_bun10s_aac_hls/muxed_segment_2_g3cFdmlkZW8.m4s new file mode 100644 index 0000000..cf4e8b4 Binary files /dev/null and b/test/fixtures/ref_bun10s_aac_hls/muxed_segment_2_g3cFdmlkZW8.m4s differ diff --git a/test/support/compare.ex b/test/support/compare.ex index 9879570..c58424c 100644 --- a/test/support/compare.ex +++ b/test/support/compare.ex @@ -9,6 +9,8 @@ defmodule Support.Compare do alias Membrane.Testing + @type compare_option :: {:kinds, [:audio | :video]} | {:format, :mp4 | :hls} + defmodule GetBuffers do @moduledoc false use Membrane.Sink @@ -31,19 +33,30 @@ defmodule Support.Compare do end end - @spec compare(Path.t(), Path.t(), [:audio | :video]) :: :ok - def compare(subject, reference, kinds \\ [:audio, :video]) do - kinds = Bunch.listify(kinds) + @spec compare(Path.t(), Path.t(), [compare_option()]) :: :ok + def compare(subject, reference, options \\ []) do + kinds = options[:kinds] || [:audio, :video] + format = options[:format] || :mp4 p = Testing.Pipeline.start_link_supervised!() - Testing.Pipeline.execute_actions(p, - spec: [ - child(%Membrane.File.Source{location: subject, seekable?: true}) - |> child(:sub_demuxer, %Membrane.MP4.Demuxer.ISOM{optimize_for_non_fast_start?: true}), - child(%Membrane.File.Source{location: reference, seekable?: true}) - |> child(:ref_demuxer, %Membrane.MP4.Demuxer.ISOM{optimize_for_non_fast_start?: true}) - ] - ) + head_spec = + case format do + :mp4 -> + [ + child(%Membrane.File.Source{location: subject, seekable?: true}) + |> child(:sub_demuxer, %Membrane.MP4.Demuxer.ISOM{optimize_for_non_fast_start?: true}), + child(%Membrane.File.Source{location: reference, seekable?: true}) + |> child(:ref_demuxer, %Membrane.MP4.Demuxer.ISOM{optimize_for_non_fast_start?: true}) + ] + + :hls -> + [ + child(:sub_demuxer, %Membrane.HTTPAdaptiveStream.Source{directory: subject}), + child(:ref_demuxer, %Membrane.HTTPAdaptiveStream.Source{directory: reference}) + ] + end + + Testing.Pipeline.execute_actions(p, spec: head_spec) assert_pipeline_notified(p, :ref_demuxer, {:new_tracks, tracks}) diff --git a/test/support/http_adaptive_stream_source.ex b/test/support/http_adaptive_stream_source.ex new file mode 100644 index 0000000..e165764 --- /dev/null +++ b/test/support/http_adaptive_stream_source.ex @@ -0,0 +1,140 @@ +defmodule Membrane.HTTPAdaptiveStream.Source do + @moduledoc false + + use Membrane.Source + + alias Membrane.{Buffer, MP4} + alias Membrane.MP4.MovieBox.TrackBox + + def_options directory: [ + spec: Path.t(), + description: "directory containing hls files" + ] + + def_output_pad :output, + accepted_format: any_of(%Membrane.H264{}, %Membrane.AAC{}), + availability: :on_request, + flow_control: :manual, + demand_unit: :buffers + + @impl true + def handle_init(_ctx, opts) do + {parsed_header, ""} = + get_prefixed_files(opts.directory, "muxed_header") + |> List.first() + |> File.read!() + |> MP4.Container.parse!() + + %{ + audio_track: %MP4.Track{id: audio_id, stream_format: audio_stream_format}, + video_track: %MP4.Track{id: video_id, stream_format: video_stream_format} + } = + parsed_header[:moov].children + |> Keyword.get_values(:trak) + |> Enum.map(&TrackBox.unpack/1) + |> Enum.reduce(%{audio_track: nil, video_track: nil}, fn track, tracks_map -> + case track.stream_format do + %Membrane.AAC{} -> %{tracks_map | audio_track: track} + %Membrane.H264{} -> %{tracks_map | video_track: track} + _other -> tracks_map + end + end) + + segments_filenames = get_prefixed_files(opts.directory, "muxed_segment") |> Enum.sort() + + {audio_buffers, video_buffers} = + Enum.map(segments_filenames, fn file -> + %{^audio_id => segment_audio_buffers, ^video_id => segment_video_buffers} = + get_buffers_from_segment(file) + + {segment_audio_buffers, segment_video_buffers} + end) + |> Enum.unzip() + |> then(fn {audio_buffers, video_buffers} -> + {List.flatten(audio_buffers), List.flatten(video_buffers)} + end) + + state = + %{ + track_data: %{ + audio: %{ + stream_format: audio_stream_format, + buffers_left: audio_buffers + }, + video: %{ + stream_format: video_stream_format, + buffers_left: video_buffers + } + } + } + + {[ + notify_parent: + {:new_tracks, [{:audio, audio_stream_format}, {:video, video_stream_format}]} + ], state} + end + + @impl true + def handle_pad_added(Pad.ref(:output, id) = pad, _ctx, state) do + {[stream_format: {pad, state.track_data[id].stream_format}], state} + end + + @impl true + def handle_demand(Pad.ref(:output, id) = pad, demand_size, :buffers, _ctx, state) do + {buffers_to_send, buffers_left} = state.track_data[id].buffers_left |> Enum.split(demand_size) + + actions = + if buffers_left == [] do + [buffer: {pad, buffers_to_send}, end_of_stream: pad] + else + [buffer: {pad, buffers_to_send}] + end + + state = put_in(state, [:track_data, id, :buffers_left], buffers_left) + + {actions, state} + end + + @spec get_buffers_from_segment(Path.t()) :: %{(track_id :: pos_integer()) => [Buffer.t()]} + defp get_buffers_from_segment(segment_filename) do + {container, ""} = segment_filename |> File.read!() |> MP4.Container.parse!() + + Enum.zip( + Keyword.get_values(container, :moof), + Keyword.get_values(container, :mdat) + ) + |> Enum.map(fn {moof_box, mdat_box} -> + traf_box_children = moof_box.children[:traf].children + + sample_sizes = + traf_box_children[:trun].fields.samples + |> Enum.map(& &1.sample_size) + + buffers = get_buffers_from_samples(sample_sizes, mdat_box.content) + + {traf_box_children[:tfhd].fields.track_id, buffers} + end) + |> Enum.into(%{}) + end + + @spec get_buffers_from_samples([pos_integer()], binary()) :: [Buffer.t()] + defp get_buffers_from_samples([], <<>>) do + [] + end + + defp get_buffers_from_samples([first_sample_length | sample_lengths_rest], samples_binary) do + <> = samples_binary + + [ + %Buffer{payload: payload} + | get_buffers_from_samples(sample_lengths_rest, samples_binary_rest) + ] + end + + @spec get_prefixed_files(Path.t(), String.t()) :: [Path.t()] + defp get_prefixed_files(directory, prefix) do + File.ls!(directory) + |> Enum.filter(&String.starts_with?(&1, prefix)) + |> Enum.map(&Path.join(directory, &1)) + end +end