Skip to content

Commit

Permalink
HLS output (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
Noarkhh authored Aug 19, 2024
1 parent ab533fe commit 717a81f
Show file tree
Hide file tree
Showing 18 changed files with 335 additions and 21 deletions.
12 changes: 11 additions & 1 deletion examples.livemd
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Mix.install([{:boombox, path: __DIR__}, :kino])
:inets.start(:httpd,
bind_address: ~c"localhost",
port: 1234,
document_root: ~c"#{__DIR__}/examples_assets/",
document_root: ~c"#{__DIR__}/examples_assets",
server_name: ~c"assets_server",
server_root: "/tmp",
erl_script_nocache: true
Expand Down Expand Up @@ -77,6 +77,16 @@ Boombox.run(input: {:webrtc, "ws://localhost:8829"}, output: {:webrtc, "ws://loc

<!-- livebook:{"branch_parent_index":0} -->

## MP4 to HLS

To receive the stream, visit http://localhost:1234/hls/stream.html after running the cell below

```elixir
Boombox.run(input: bbb_mp4, output: {:hls, "examples_assets/hls/hls_output"})
```

<!-- livebook:{"branch_parent_index":0} -->

## RTMP to MP4

```elixir
Expand Down
3 changes: 3 additions & 0 deletions examples_assets/hls/hls_output/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*
!.gitkeep
!.gitignore
Empty file.
17 changes: 17 additions & 0 deletions examples_assets/hls/stream.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<!DOCTYPE html>
<html lang="en">

<body>
<script src="https://cdn.jsdelivr.net/npm/hls.js@latest"></script>
<h1>Membrane Boombox HLS Example</h1>
<video id="player" autoplay muted controls></video>
<script>
var video = document.getElementById('player');
var hls = new Hls();
hls.loadSource("hls_output/index.m3u8");
hls.attachMedia(video);
</script>

</body>

</html>
1 change: 1 addition & 0 deletions lib/boombox.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ defmodule Boombox do
| Path.t()
| {:file, file_extension(), Path.t()}
| {:webrtc, webrtc_opts()}
| {:hls, Path.t()}

@spec run(input: input, output: output) :: :ok
def run(opts) do
Expand Down
57 changes: 57 additions & 0 deletions lib/boombox/hls.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
defmodule Boombox.HLS do
@moduledoc false

import Membrane.ChildrenSpec

require Membrane.Pad, as: Pad
alias Boombox.Pipeline.Ready
alias Membrane.Time

@spec link_output(
Path.t(),
Boombox.Pipeline.track_builders(),
Membrane.ChildrenSpec.t()
) :: Ready.t()
def link_output(location, track_builders, spec_builder) do
{directory, manifest_name} =
if Path.extname(location) == ".m3u8" do
{Path.dirname(location), Path.basename(location, ".m3u8")}
else
{location, "index"}
end

spec =
[
spec_builder,
child(
:hls_sink_bin,
%Membrane.HTTPAdaptiveStream.SinkBin{
manifest_name: manifest_name,
manifest_module: Membrane.HTTPAdaptiveStream.HLS,
storage: %Membrane.HTTPAdaptiveStream.Storages.FileStorage{
directory: directory
},
hls_mode: :muxed_av
}
),
Enum.map(track_builders, fn
{:audio, builder} ->
builder
|> child(:hls_out_aac_encoder, Membrane.AAC.FDK.Encoder)
|> via_in(Pad.ref(:input, :audio),
options: [encoding: :AAC, segment_duration: Time.milliseconds(2000)]
)
|> get_child(:hls_sink_bin)

{:video, builder} ->
builder
|> via_in(Pad.ref(:input, :video),
options: [encoding: :H264, segment_duration: Time.milliseconds(2000)]
)
|> get_child(:hls_sink_bin)
end)
]

%Ready{actions: [spec: spec]}
end
end
18 changes: 15 additions & 3 deletions lib/boombox/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ defmodule Boombox.Pipeline do

require Membrane.Logger

@supported_file_extensions %{".mp4" => :mp4}
@supported_file_extensions %{".mp4" => :mp4, ".m3u8" => :m3u8}

@type track_builders :: %{
optional(:audio) => Membrane.ChildrenSpec.t(),
Expand Down Expand Up @@ -149,6 +149,11 @@ defmodule Boombox.Pipeline do
end
end

@impl true
def handle_child_notification(:end_of_stream, :hls_sink_bin, _ctx, state) do
{[terminate: :normal], state}
end

@impl true
def handle_child_notification(notification, child, _ctx, state) do
Membrane.Logger.debug_verbose(
Expand Down Expand Up @@ -293,6 +298,10 @@ defmodule Boombox.Pipeline do
Boombox.MP4.link_output(location, track_builders, spec_builder)
end

defp link_output({:hls, location}, track_builders, spec_builder, _ctx) do
Boombox.HLS.link_output(location, track_builders, spec_builder)
end

defp parse_input(input) when is_binary(input) do
uri = URI.new!(input)

Expand Down Expand Up @@ -320,7 +329,10 @@ defmodule Boombox.Pipeline do

case uri do
%URI{scheme: nil, path: path} when path != nil ->
{:file, parse_file_extension(path), path}
case parse_file_extension(path) do
:m3u8 -> {:hls, path}
file_type -> {:file, file_type, path}
end

_other ->
raise "Unsupported URI: #{output}"
Expand All @@ -331,7 +343,7 @@ defmodule Boombox.Pipeline do
output
end

@spec parse_file_extension(Path.t()) :: Boombox.file_extension()
@spec parse_file_extension(Path.t()) :: Boombox.file_extension() | :m3u8
defp parse_file_extension(path) do
extension = Path.extname(path)

Expand Down
4 changes: 3 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ defmodule Boombox.Mixfile do
{:membrane_aac_fdk_plugin, "~> 0.18.0"},
{:membrane_h26x_plugin, "~> 0.10.0"},
{:membrane_h264_ffmpeg_plugin, "~> 0.32.0"},
{:membrane_mp4_plugin, github: "membraneframework/membrane_mp4_plugin", branch: "wip-avc3"},
{:membrane_mp4_plugin,
github: "membraneframework/membrane_mp4_plugin", branch: "wip-avc3", override: true},
{:membrane_realtimer_plugin, "~> 0.9.0"},
{:membrane_http_adaptive_stream_plugin, "~> 0.18.0"},
{:membrane_rtmp_plugin, github: "membraneframework/membrane_rtmp_plugin"},
{:membrane_ffmpeg_swresample_plugin, "~> 0.20.0"},
{:membrane_hackney_plugin, "~> 0.11.0"},
Expand Down
2 changes: 2 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
"membrane_h265_format": {:hex, :membrane_h265_format, "0.2.0", "1903c072cf7b0980c4d0c117ab61a2cd33e88782b696290de29570a7fab34819", [:mix], [], "hexpm", "6df418bdf242c0d9f7dbf2e5aea4c2d182e34ac9ad5a8b8cef2610c290002e83"},
"membrane_h26x_plugin": {:hex, :membrane_h26x_plugin, "0.10.2", "caf2790d8c107df35f8d456b45f4e09fb9c56ce6c7669a3a03f7d59972e6ed82", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}], "hexpm", "becf1ac4a589adecd850137ccd61a33058f686083a514a7e39fcd721bcf9fb2e"},
"membrane_hackney_plugin": {:hex, :membrane_hackney_plugin, "0.11.0", "54b368333a23394e7cac2f4d6b701bf8c5ee6614670a31f4ebe009b5e691a5c1", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:mockery, "~> 2.3", [hex: :mockery, repo: "hexpm", optional: false]}], "hexpm", "2b28fd1be3c889d5824d7d985598386c7673828c88f49a91221df3626af8a998"},
"membrane_http_adaptive_stream_plugin": {:hex, :membrane_http_adaptive_stream_plugin, "0.18.4", "f938907eb0e39db2acf8c84e0b770c1976b2793bd58c2e37eca0334dc019f11d", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_plugin, "~> 0.18.0", [hex: :membrane_aac_plugin, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h26x_plugin, "~> 0.10.0", [hex: :membrane_h26x_plugin, repo: "hexpm", optional: false]}, {:membrane_mp4_plugin, "~> 0.34.1", [hex: :membrane_mp4_plugin, repo: "hexpm", optional: false]}, {:membrane_tee_plugin, "~> 0.12.0", [hex: :membrane_tee_plugin, repo: "hexpm", optional: false]}, {:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}], "hexpm", "3f9174faf1f734f1b8507aeea0525384869797e51b88f1b2e33931bae9de32fc"},
"membrane_mp4_format": {:hex, :membrane_mp4_format, "0.8.0", "8c6e7d68829228117d333b4fbb030e7be829aab49dd8cb047fdc664db1812e6a", [:mix], [], "hexpm", "148dea678a1f82ccfd44dbde6f936d2f21255f496cb45a22cc6eec427f025522"},
"membrane_mp4_plugin": {:git, "https://github.com/membraneframework/membrane_mp4_plugin.git", "3786a3e834cc2ce541c3c7830590b39da87e12a1", [branch: "wip-avc3"]},
"membrane_opus_format": {:hex, :membrane_opus_format, "0.3.0", "3804d9916058b7cfa2baa0131a644d8186198d64f52d592ae09e0942513cb4c2", [:mix], [], "hexpm", "8fc89c97be50de23ded15f2050fe603dcce732566fe6fdd15a2de01cb6b81afe"},
Expand All @@ -66,6 +67,7 @@
"membrane_rtp_opus_plugin": {:hex, :membrane_rtp_opus_plugin, "0.9.0", "ae76421faa04697a4af76a55b6c5e675dea61b611d29d8201098783d42863af7", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}], "hexpm", "58f095d2978daf999d87c1c016007cb7d99434208486331ab5045e77f5be9dcc"},
"membrane_rtp_plugin": {:hex, :membrane_rtp_plugin, "0.29.0", "0277310eb599b8e6de9e0b864807f23b3b245865e39a28f0cbab695d1f2c157e", [:mix], [{:bimap, "~> 1.2", [hex: :bimap, repo: "hexpm", optional: false]}, {:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:ex_libsrtp, "~> 0.6.0 or ~> 0.7.0", [hex: :ex_libsrtp, repo: "hexpm", optional: true]}, {:heap, "~> 2.0.2", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_funnel_plugin, "~> 0.9.0", [hex: :membrane_funnel_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}, {:membrane_telemetry_metrics, "~> 0.1.0", [hex: :membrane_telemetry_metrics, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.1", [hex: :qex, repo: "hexpm", optional: false]}], "hexpm", "1b3fd808114e06332b6a4e000238998a9188d1ef625c414ca3239aee70f0775d"},
"membrane_rtp_vp8_plugin": {:hex, :membrane_rtp_vp8_plugin, "0.9.1", "9e8a74d764730a23382ba862a238963c9639b4c6963238caeb6fe2449a66add8", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}, {:membrane_vp8_format, "~> 0.4.0", [hex: :membrane_vp8_format, repo: "hexpm", optional: false]}], "hexpm", "704856eb2734bb6ea5cc47242c241de45debb5724a81cffb344bacda9867fe98"},
"membrane_tee_plugin": {:hex, :membrane_tee_plugin, "0.12.0", "f94989b4080ef4b7937d74c1a14d3379577c7bd4c6d06e5a2bb41c351ad604d4", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "0d61c9ed5e68e5a75d54200e1c6df5739c0bcb52fee0974183ad72446a179887"},
"membrane_telemetry_metrics": {:hex, :membrane_telemetry_metrics, "0.1.0", "cb93d28356b436b0597736c3e4153738d82d2a14ff547f831df7e9051e54fc06", [:mix], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6.1", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "aba28dc8311f70ced95d984509be930fac55857d2d18bffcf768815e627be3f0"},
"membrane_vp8_format": {:hex, :membrane_vp8_format, "0.4.0", "6c29ec67479edfbab27b11266dc92f18f3baf4421262c5c31af348c33e5b92c7", [:mix], [], "hexpm", "8bb005ede61db8fcb3535a883f32168b251c2dfd1109197c8c3b39ce28ed08e2"},
"membrane_webrtc_plugin": {:hex, :membrane_webrtc_plugin, "0.21.0", "0d47a6ffe3eb18abf43e9f6d089a409120ecd5cff43095d065fbb9e1c038f79c", [:mix], [{:bandit, "~> 1.2", [hex: :bandit, repo: "hexpm", optional: false]}, {:ex_webrtc, "~> 0.3.0", [hex: :ex_webrtc, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_rtp_h264_plugin, "~> 0.19.0", [hex: :membrane_rtp_h264_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_opus_plugin, "~> 0.9.0", [hex: :membrane_rtp_opus_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_plugin, "~> 0.29.0", [hex: :membrane_rtp_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_vp8_plugin, "~> 0.9.1", [hex: :membrane_rtp_vp8_plugin, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.0", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "39d383eadb1b1ce10975ac8505012e901c8961e6f5a65577ff0fbf03b7bc8fc7"},
Expand Down
49 changes: 44 additions & 5 deletions test/boombox_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,21 @@ defmodule BoomboxTest do
async_test "mp4 file -> mp4 file", %{tmp_dir: tmp} do
output = Path.join(tmp, "output.mp4")
Boombox.run(input: @bbb_mp4, output: output)
Compare.compare("#{tmp}/output.mp4", "test/fixtures/ref_bun10s_aac.mp4")
Compare.compare(output, "test/fixtures/ref_bun10s_aac.mp4")
end

@tag :file_file_mp4_audio
async_test "mp4 file -> mp4 file audio", %{tmp_dir: tmp} do
output = Path.join(tmp, "output.mp4")
Boombox.run(input: @bbb_mp4_a, output: output)
Compare.compare(output, "test/fixtures/ref_bun10s_aac.mp4", :audio)
Compare.compare(output, "test/fixtures/ref_bun10s_aac.mp4", kinds: [:audio])
end

@tag :file_file_mp4_video
async_test "mp4 file -> mp4 file video", %{tmp_dir: tmp} do
output = Path.join(tmp, "output.mp4")
Boombox.run(input: @bbb_mp4_v, output: output)
Compare.compare(output, "test/fixtures/ref_bun10s_aac.mp4", :video)
Compare.compare(output, "test/fixtures/ref_bun10s_aac.mp4", kinds: [:video])
end

@tag :http_file_mp4
Expand Down Expand Up @@ -76,7 +76,7 @@ defmodule BoomboxTest do

Boombox.run(input: {:webrtc, signaling}, output: "#{tmp}/output.mp4")
Task.await(t)
Compare.compare(output, "test/fixtures/ref_bun10s_opus_aac.mp4", :audio)
Compare.compare(output, "test/fixtures/ref_bun10s_opus_aac.mp4", kinds: [:audio])
end

@tag :webrtc_video
Expand All @@ -89,7 +89,7 @@ defmodule BoomboxTest do

Boombox.run(input: {:webrtc, signaling}, output: output)
Task.await(t)
Compare.compare(output, "test/fixtures/ref_bun10s_opus_aac.mp4", :video)
Compare.compare(output, "test/fixtures/ref_bun10s_opus_aac.mp4", kinds: [:video])
end

@tag :webrtc2
Expand Down Expand Up @@ -147,6 +147,45 @@ defmodule BoomboxTest do
Compare.compare(output, "test/fixtures/ref_bun10s_opus_aac.mp4")
end

@tag :file_hls
async_test "mp4 file -> hls", %{tmp_dir: tmp} do
manifest_filename = Path.join(tmp, "index.m3u8")
Boombox.run(input: @bbb_mp4, output: manifest_filename)
ref_path = "test/fixtures/ref_bun10s_aac_hls"
Compare.compare(tmp, ref_path, format: :hls)

Enum.zip(
Path.join(tmp, "*.mp4") |> Path.wildcard(),
Path.join(ref_path, "*.mp4") |> Path.wildcard()
)
|> Enum.each(fn {output_file, ref_file} ->
assert File.read!(output_file) == File.read!(ref_file)
end)
end

@tag :rtmp_hls
async_test "rtmp -> hls", %{tmp_dir: tmp} do
manifest_filename = Path.join(tmp, "index.m3u8")
url = "rtmp://localhost:5003/app/stream_key"
ref_path = "test/fixtures/ref_bun10s_aac_hls"
t = Task.async(fn -> Boombox.run(input: url, output: manifest_filename) end)

# Wait for boombox to be ready
Process.sleep(200)
p = send_rtmp(url)
Task.await(t, 30_000)
Testing.Pipeline.terminate(p)
Compare.compare(tmp, ref_path, format: :hls)

Enum.zip(
Path.join(tmp, "*.mp4") |> Path.wildcard(),
Path.join(ref_path, "*.mp4") |> Path.wildcard()
)
|> Enum.each(fn {output_file, ref_file} ->
assert File.read!(output_file) == File.read!(ref_file)
end)
end

defp send_rtmp(url) do
p =
Testing.Pipeline.start_link_supervised!(
Expand Down
13 changes: 13 additions & 0 deletions test/fixtures/ref_bun10s_aac_hls/g3cFdmlkZW8.m3u8
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#EXTM3U
#EXT-X-VERSION:7
#EXT-X-TARGETDURATION:5
#EXT-X-MEDIA-SEQUENCE:0
#EXT-X-DISCONTINUITY-SEQUENCE:0
#EXT-X-MAP:URI="muxed_header_g3cFdmlkZW8_part_0.mp4"
#EXTINF:4.80365593,
muxed_segment_0_g3cFdmlkZW8.m4s
#EXTINF:4.288194112,
muxed_segment_1_g3cFdmlkZW8.m4s
#EXTINF:0.924473961,
muxed_segment_2_g3cFdmlkZW8.m4s
#EXT-X-ENDLIST
5 changes: 5 additions & 0 deletions test/fixtures/ref_bun10s_aac_hls/index.m3u8
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#EXTM3U
#EXT-X-VERSION:7
#EXT-X-INDEPENDENT-SEGMENTS
#EXT-X-STREAM-INF:BANDWIDTH=1176608,AVERAGE-BANDWIDTH=969642,RESOLUTION=480x270,CODECS="avc1.42e015,mp4a.40.2"
g3cFdmlkZW8.m3u8
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
35 changes: 24 additions & 11 deletions test/support/compare.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ defmodule Support.Compare do

alias Membrane.Testing

@type compare_option :: {:kinds, [:audio | :video]} | {:format, :mp4 | :hls}

defmodule GetBuffers do
@moduledoc false
use Membrane.Sink
Expand All @@ -31,19 +33,30 @@ defmodule Support.Compare do
end
end

@spec compare(Path.t(), Path.t(), [:audio | :video]) :: :ok
def compare(subject, reference, kinds \\ [:audio, :video]) do
kinds = Bunch.listify(kinds)
@spec compare(Path.t(), Path.t(), [compare_option()]) :: :ok
def compare(subject, reference, options \\ []) do
kinds = options[:kinds] || [:audio, :video]
format = options[:format] || :mp4
p = Testing.Pipeline.start_link_supervised!()

Testing.Pipeline.execute_actions(p,
spec: [
child(%Membrane.File.Source{location: subject, seekable?: true})
|> child(:sub_demuxer, %Membrane.MP4.Demuxer.ISOM{optimize_for_non_fast_start?: true}),
child(%Membrane.File.Source{location: reference, seekable?: true})
|> child(:ref_demuxer, %Membrane.MP4.Demuxer.ISOM{optimize_for_non_fast_start?: true})
]
)
head_spec =
case format do
:mp4 ->
[
child(%Membrane.File.Source{location: subject, seekable?: true})
|> child(:sub_demuxer, %Membrane.MP4.Demuxer.ISOM{optimize_for_non_fast_start?: true}),
child(%Membrane.File.Source{location: reference, seekable?: true})
|> child(:ref_demuxer, %Membrane.MP4.Demuxer.ISOM{optimize_for_non_fast_start?: true})
]

:hls ->
[
child(:sub_demuxer, %Membrane.HTTPAdaptiveStream.Source{directory: subject}),
child(:ref_demuxer, %Membrane.HTTPAdaptiveStream.Source{directory: reference})
]
end

Testing.Pipeline.execute_actions(p, spec: head_spec)

assert_pipeline_notified(p, :ref_demuxer, {:new_tracks, tracks})

Expand Down
Loading

0 comments on commit 717a81f

Please sign in to comment.