Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HLS HTTP output #31

Open
wants to merge 70 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
2a17063
webrtc, rtmp, mp4
mat-hek May 10, 2024
541c098
add tests
mat-hek Jun 6, 2024
f338b2c
move protocol-specific stuff to separate modules
mat-hek Jun 14, 2024
f8b9830
small pipeline refactor
mat-hek Jun 17, 2024
86356e7
sole audio/video
mat-hek Jun 19, 2024
17930ad
satisfy credo and dialyzer
mat-hek Jun 21, 2024
a0b94a4
use mp4 from github
mat-hek Jun 21, 2024
eda08a9
support passing uri for mp4 and rtmp
mat-hek Jun 21, 2024
abc938c
add examples
mat-hek Jun 21, 2024
85f625c
use released rtp h264
mat-hek Jun 24, 2024
d976188
improve fixture comparisons
mat-hek Jun 24, 2024
81be637
improve async_test
mat-hek Jun 24, 2024
f54acb1
add more comments
mat-hek Jun 24, 2024
b3cfe5b
Apply suggestions from code review
mat-hek Jul 9, 2024
b0901b3
new rtmp api basic functionality works
bartkrak Jul 17, 2024
da15082
little refactor, format
bartkrak Jul 17, 2024
e97f190
code cleanup
bartkrak Jul 18, 2024
5ffcb78
rtsp stream is dumped to mp4 file but isn't playable
bartkrak Jul 22, 2024
4bdb6e6
wip
bartkrak Jul 23, 2024
f292864
url parsing
bartkrak Jul 24, 2024
60ec898
rtmp server under utility supervisor
bartkrak Jul 24, 2024
e4435cf
ci fix
bartkrak Jul 24, 2024
08a557d
dialyzer fix
bartkrak Jul 24, 2024
9b28379
replace lists with tuples for i/o spec
mat-hek Jul 29, 2024
7401a91
small fixes
bartkrak Jul 31, 2024
64d9955
lint fix
bartkrak Jul 31, 2024
13870e1
Merge pull request #12 from membraneframework-labs/use_new_rtmp_api
bartkrak Jul 31, 2024
3d43968
template -> boombox
mat-hek Jul 31, 2024
8812bd4
fix rtmp dep
mat-hek Jul 31, 2024
5f6c646
update examples
mat-hek Jul 31, 2024
72f56ad
use mp4 from WIP branch, as it doesn't work with isom-avc3 branch due…
mat-hek Aug 2, 2024
3521f0f
Merge remote-tracking branch 'origin/poc' into rtsp_input
bartkrak Aug 6, 2024
e47266f
wip
bartkrak Aug 7, 2024
7d8deff
Merge branch 'master' into rtsp_input
bartkrak Aug 7, 2024
13c789d
comment
bartkrak Aug 7, 2024
22a5ff5
Merge branch 'master' into rtsp_input
Noarkhh Aug 19, 2024
560e3c0
WIP
Noarkhh Aug 19, 2024
5a14f08
Add rtsp input
Noarkhh Aug 20, 2024
858ba59
Add a rtsp to hls test
Noarkhh Aug 20, 2024
3444fe1
Add a rtsp to hls test
Noarkhh Aug 23, 2024
1c86076
Adjust to new rtmp api
Noarkhh Aug 23, 2024
079965f
Format
Noarkhh Aug 23, 2024
94d077c
Update deps
Noarkhh Aug 26, 2024
5e6d6bd
Satisfy credo, use fixed opus
Noarkhh Aug 26, 2024
f1fd471
Bump cache version
Noarkhh Aug 26, 2024
0abd412
Use fixed hls
Noarkhh Aug 26, 2024
d5c171c
WIP
Noarkhh Aug 26, 2024
8fdc97c
Make tests pass
Noarkhh Aug 26, 2024
e70ca08
Add an example
Noarkhh Aug 27, 2024
9b570e0
Merge branch 'master' into rtsp_input
Noarkhh Aug 27, 2024
91d4f90
Add server stopping to rtsp example
Noarkhh Aug 27, 2024
3ef8826
Minor refactoring
Noarkhh Aug 27, 2024
e871fcf
Apply reviewers suggestion
Noarkhh Aug 28, 2024
e195097
Adjust new rtsp_plugin api
Noarkhh Aug 29, 2024
44e231f
Add a warning in case of duplicate tracks
Noarkhh Aug 29, 2024
333e69f
Merge branch 'master' into rtsp_input
Noarkhh Sep 2, 2024
06d17b6
Increase min_square_error margin for video
Noarkhh Sep 2, 2024
568dc23
Clean up mix.exs
Noarkhh Sep 2, 2024
e1169d4
Move rtsp_state typing to Boombox.RTSP
Noarkhh Sep 4, 2024
f0ce287
Use released packages
Noarkhh Sep 5, 2024
98f9317
Implement simple hls files uploading
Noarkhh Aug 27, 2024
7100bf0
Allow to stream hls files to a http server
Noarkhh Aug 27, 2024
c2f7a2c
Satisfy credo
Noarkhh Aug 28, 2024
f957fb4
Expand failed DELETE error
Noarkhh Aug 28, 2024
919d58b
Rename Uploader
Noarkhh Aug 28, 2024
e693bcf
Refactor the Uploader
Noarkhh Aug 28, 2024
0ac020c
Use utility supervisor
Noarkhh Aug 28, 2024
f82c981
Fix bad access
Noarkhh Aug 28, 2024
dbbf546
Bump deps
Noarkhh Aug 28, 2024
12cb7a8
Fix parsing opts
Noarkhh Sep 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,16 @@ workflows:
build:
jobs:
- elixir/build_test:
cache-version: 7
filters: &filters
tags:
only: /v.*/
- elixir/test:
cache-version: 7
filters:
<<: *filters
- elixir/lint:
cache-version: 7
filters:
<<: *filters
- elixir/hex_publish:
cache-version: 7
requires:
- elixir/build_test
- elixir/test
Expand Down
40 changes: 35 additions & 5 deletions examples.livemd
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Logger.configure(level: :info)
# For ffmpeg and ffplay commands to work on Mac Livebook Desktop
System.put_env("PATH", "/opt/homebrew/bin:#{System.get_env("PATH")}")

Mix.install([{:boombox, path: __DIR__}, :kino, :nx, :exla, :bumblebee])
Mix.install([{:boombox, path: __DIR__, env: :test}, :kino, :nx, :exla, :bumblebee])

Nx.global_default_backend(EXLA.Backend)
```
Expand Down Expand Up @@ -87,10 +87,40 @@ Boombox.run(input: {:webrtc, "ws://localhost:8829"}, output: {:webrtc, "ws://loc
To receive the stream, visit http://localhost:1234/hls/stream.html after running the cell below

```elixir
Boombox.run(
input: "#{input_dir}/bun10s.mp4",
output: {:hls, "#{__DIR__}/examples_assets/hls/hls_output"}
)
hls_out_dir = "#{__DIR__}/examples_assets/hls/hls_output"

"#{hls_out_dir}/*" |> Path.wildcard() |> Enum.each(&File.rm!/1)

Boombox.run(input: "#{input_dir}/bun10s.mp4", output: {:hls, hls_out_dir})
```

<!-- livebook:{"branch_parent_index":0} -->

## RTSP to HLS

To receive the stream, visit http://localhost:1234/hls/stream.html after running the cell below

```elixir
hls_out_dir = "#{__DIR__}/examples_assets/hls/hls_output"

"#{hls_out_dir}/*" |> Path.wildcard() |> Enum.each(&File.rm!/1)

rtp_server_port = 30_003
rtsp_port = 8554

{:ok, server} =
Membrane.RTSP.Server.start_link(
handler: Membrane.Support.RTSP.Server.Handler,
handler_config: %{fixture_path: bbb_mp4},
address: {127, 0, 0, 1},
port: rtsp_port,
udp_rtp_port: rtp_server_port,
udp_rtcp_port: rtp_server_port + 1
)

Boombox.run(input: "rtsp://localhost:#{rtsp_port}/livestream", output: "#{hls_out_dir}/index.m3u8")

Membrane.RTSP.Server.stop(server)
```

<!-- livebook:{"branch_parent_index":0} -->
Expand Down
13 changes: 11 additions & 2 deletions lib/boombox.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ defmodule Boombox do
| {:mp4, location :: String.t(), transport: :file | :http}
| {:webrtc, webrtc_signaling()}
| {:rtmp, (uri :: String.t()) | (client_handler :: pid)}
| {:rtsp, url :: String.t()}
| {:stream, in_stream_opts()}

@type output ::
(path_or_uri :: String.t())
| {:mp4, location :: String.t()}
| {:webrtc, webrtc_signaling()}
| {:hls, location :: String.t()}
| {:hls, location :: String.t(), transport: :file | :http}
| {:stream, out_stream_opts()}

@typep procs :: %{pipeline: pid(), supervisor: pid()}
Expand Down Expand Up @@ -84,7 +85,8 @@ defmodule Boombox do
{scheme, ".mp4", :input} when scheme in [nil, "http", "https"] -> {:mp4, value}
{nil, ".mp4", :output} -> {:mp4, value}
{scheme, _ext, :input} when scheme in ["rtmp", "rtmps"] -> {:rtmp, value}
{nil, ".m3u8", :output} -> {:hls, value}
{"rtsp", _ext, :input} -> {:rtsp, value}
{scheme, ".m3u8", :output} when scheme in [nil, "http", "https"] -> {:hls, value}
_other -> raise ArgumentError, "Unsupported URI: #{value} for direction: #{direction}"
end
|> then(&parse_opt!(direction, &1))
Expand Down Expand Up @@ -113,6 +115,13 @@ defmodule Boombox do
value

{:hls, location} when direction == :output and is_binary(location) ->
parse_opt!(:output, {:hls, location, []})

{:hls, location, opts} when direction == :output and is_binary(location) ->
if Keyword.keyword?(opts),
do: {:hls, location, transport: resolve_transport(location, opts)}

{:rtsp, location} when direction == :input and is_binary(location) ->
value

{:stream, opts} ->
Expand Down
102 changes: 95 additions & 7 deletions lib/boombox/hls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,112 @@ defmodule Boombox.HLS do
import Membrane.ChildrenSpec

require Membrane.Pad, as: Pad

alias Boombox.Pipeline.Ready
alias Membrane.Time
alias Membrane.{HTTPAdaptiveStream, Time, UtilitySupervisor}

defmodule HTTPUploader do
@moduledoc false
use GenServer

require Logger

alias Membrane.HTTPAdaptiveStream.Storages.GenServerStorage

@spec start_link(directory: String.t()) :: GenServer.on_start()
def start_link(config) do
GenServer.start_link(__MODULE__, config)
end

@impl true
def init(config) do
{:ok, %{directory: config[:directory]}}
end

@impl true
def handle_call(
{GenServerStorage, :store, %{context: %{type: :partial_segment}}},
_from,
state
) do
Logger.warning("LL-HLS is not supported. The partial segment is omitted.")
{:reply, :ok, state}
end

@impl true
def handle_call({GenServerStorage, :store, params}, _from, state) do
location = Path.join(state.directory, params.name)

reply =
:hackney.request(:post, location, [], params.contents, follow_redirect: true)
|> handle_request_result()

{:reply, reply, state}
end

@impl true
def handle_call({GenServerStorage, :remove, params}, _from, state) do
location = Path.join(state.directory, params.name)

reply =
:hackney.request(:delete, location, [], <<>>, follow_redirect: true)
|> handle_request_result()

{:reply, reply, state}
end

@spec handle_request_result(
{:ok, pos_integer(), list(), :hackney.client_ref()}
| {:error, term()}
) :: :ok | {:error, term()}
defp handle_request_result(result) do
case result do
{:ok, status, _headers, _ref} when status in 200..299 ->
:ok

{:ok, status, _headers, ref} ->
{:ok, body} = :hackney.body(ref)
{:error, "Request failed with status code #{status}: #{body}"}

error ->
error
end
end
end

@spec link_output(
Path.t(),
Boombox.Pipeline.track_builders(),
Membrane.ChildrenSpec.t()
Membrane.ChildrenSpec.t(),
UtilitySupervisor.t(),
transport: :file | :http
) :: Ready.t()
def link_output(location, track_builders, spec_builder) do
def link_output(location, track_builders, spec_builder, utility_supervisor, opts) do
{directory, manifest_name} =
if Path.extname(location) == ".m3u8" do
{Path.dirname(location), Path.basename(location, ".m3u8")}
else
{location, "index"}
end

storage =
case opts[:transport] do
:file ->
%HTTPAdaptiveStream.Storages.FileStorage{directory: directory}

:http ->
{:ok, uploader} =
UtilitySupervisor.start_link_child(
utility_supervisor,
{HTTPUploader, directory: directory}
)

%HTTPAdaptiveStream.Storages.GenServerStorage{destination: uploader}
end

hls_mode =
if Map.keys(track_builders) == [:video], do: :separate_av, else: :muxed_av

spec =
[
spec_builder,
Expand All @@ -28,10 +118,8 @@ defmodule Boombox.HLS do
%Membrane.HTTPAdaptiveStream.SinkBin{
manifest_name: manifest_name,
manifest_module: Membrane.HTTPAdaptiveStream.HLS,
storage: %Membrane.HTTPAdaptiveStream.Storages.FileStorage{
directory: directory
},
hls_mode: :muxed_av
storage: storage,
hls_mode: hls_mode
}
),
Enum.map(track_builders, fn
Expand Down
5 changes: 4 additions & 1 deletion lib/boombox/mp4.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ defmodule Boombox.MP4 do
{:audio, spec}

{id, %Membrane.H264{}} ->
spec = get_child(:mp4_demuxer) |> via_out(Pad.ref(:output, id))
spec =
get_child(:mp4_demuxer)
|> via_out(Pad.ref(:output, id))

{:video, spec}
end)

Expand Down
24 changes: 21 additions & 3 deletions lib/boombox/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ defmodule Boombox.Pipeline do
spec_builder: [],
track_builders: nil,
last_result: nil,
eos_info: nil
eos_info: nil,
rtsp_state: nil
]

@typedoc """
Expand Down Expand Up @@ -90,6 +91,7 @@ defmodule Boombox.Pipeline do
track_builders: Boombox.Pipeline.track_builders() | nil,
last_result: Boombox.Pipeline.Ready.t() | Boombox.Pipeline.Wait.t() | nil,
eos_info: term(),
rtsp_state: Boombox.RTSP.rtsp_state() | nil,
parent: pid()
}
end
Expand All @@ -112,6 +114,18 @@ defmodule Boombox.Pipeline do
|> proceed_result(ctx, state)
end

@impl true
def handle_child_notification({:set_up_tracks, tracks}, :rtsp_source, ctx, state) do
{result, state} = Boombox.RTSP.handle_set_up_tracks(tracks, state)
proceed_result(result, ctx, state)
end

@impl true
def handle_child_notification({:new_track, ssrc, track}, :rtsp_source, ctx, state) do
{result, state} = Boombox.RTSP.handle_input_track(ssrc, track, state)
proceed_result(result, ctx, state)
end

@impl true
def handle_child_notification({:new_tracks, tracks}, :webrtc_input, ctx, state) do
Boombox.WebRTC.handle_input_tracks(tracks)
Expand Down Expand Up @@ -284,6 +298,10 @@ defmodule Boombox.Pipeline do
Boombox.RTMP.create_input(src, ctx.utility_supervisor)
end

defp create_input({:rtsp, uri}, _ctx, _state) do
Boombox.RTSP.create_input(uri)
end

defp create_input({:stream, params}, _ctx, state) do
Boombox.ElixirStream.create_input(state.parent, params)
end
Expand Down Expand Up @@ -314,8 +332,8 @@ defmodule Boombox.Pipeline do
Boombox.MP4.link_output(location, track_builders, spec_builder)
end

defp link_output({:hls, location}, track_builders, spec_builder, _ctx, _state) do
Boombox.HLS.link_output(location, track_builders, spec_builder)
defp link_output({:hls, location, opts}, track_builders, spec_builder, ctx, _state) do
Boombox.HLS.link_output(location, track_builders, spec_builder, ctx.utility_supervisor, opts)
end

defp link_output({:stream, opts}, track_builders, spec_builder, _ctx, state) do
Expand Down
7 changes: 3 additions & 4 deletions lib/boombox/rtmp.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ defmodule Boombox.RTMP do
import Membrane.ChildrenSpec
require Membrane.Logger
alias Boombox.Pipeline.{Ready, Wait}
alias Membrane.{RTMP, RTMPServer}
alias Membrane.{RTMP, RTMPServer, UtilitySupervisor}

@spec create_input(String.t() | pid(), pid()) :: Wait.t()
@spec create_input(String.t() | pid(), UtilitySupervisor.t()) :: Wait.t()
def create_input(client_ref, _utility_supervisor) when is_pid(client_ref) do
handle_connection(client_ref)
end
Expand Down Expand Up @@ -43,12 +43,11 @@ defmodule Boombox.RTMP do

@spec handle_connection(pid()) :: Ready.t()
def handle_connection(client_ref) do
spec = [
spec =
child(:rtmp_source, %RTMP.SourceBin{client_ref: client_ref})
|> via_out(:audio)
|> child(:rtmp_in_aac_parser, Membrane.AAC.Parser)
|> child(:rtmp_in_aac_decoder, Membrane.AAC.FDK.Decoder)
]

track_builders = %{
audio: get_child(:rtmp_in_aac_decoder),
Expand Down
Loading