Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gateway publish to orchestrator. #3211

Draft
wants to merge 5 commits into
base: ja/add-mediamtx-listener
Choose a base branch
from

Conversation

j0sh
Copy link
Collaborator

@j0sh j0sh commented Oct 18, 2024

  • Requires Scaffolding for realtime-to-realtime #3210
  • Set up a trickle HTTP endpoints on the orchestrator (requires golang 1.22 for the new routes; will send a separate patch to bump the go.mod and do any CI adjustments)
  • Pulls a RTMP stream from MediaMTX on the gateway when a new stream comes in
  • Converts the RTMP into mpegts segments
  • Publishes segments to the orchestrator via trickle HTTP

@j0sh j0sh mentioned this pull request Oct 18, 2024
5 tasks
Copy link
Contributor

@leszko leszko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work, I think this is a very important PR, because it describes the protocol between G<>O. I added 2 comments, because I think I still don't understand how we plan it all to work.

Other than that, @j0sh is it possible to actually run the code from this PR? If yes, could you describe how to run it locally? I think if I plan with it, I can have some more comments and understand if better.

// Kick off the RTMP pull and segmentation as soon as possible
ssr := media.NewSwitchableSegmentReader()
go func() {
media.RunSegmentation("rtmp://localhost/"+streamName, ssr.Read)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean, that Gateway will pull the stream from MediaMTX? Isn't it opposite to what you designed on the diagram?

image

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, gateway initiates the RTMP pull, but the flow of media still goes from mediamtx -> gateway

I guess there could be a better distinction between "who initiates the pull" vis-a-vis the actual flow of media, but RTMP isn't really a request-response protocol in the same way the other HTTP flows are in this diagram. Open to suggestions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can add one more arrow with "initiate RTMP" 🙃 For me it is/was pretty confusing.

u = sess.Transcoder() + u
return url.Parse(u)
}
pub, err := appendHostname(resp.JSON200.PublishUrl)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need PublishUrl in Orchestrator? Isn't it Gateway Publishing to Orchestrator?

I think in general it's hard to grasp how this trickle server works. Could you describe it somewhere? I understand it should correspond to diagram from your doc, but where is the publish/subscribe part?

image image

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need PublishUrl in Orchestrator? Isn't it Gateway Publishing to Orchestrator?

Yeah this was a bit of a spur of the moment addition when I was looking at the overall flow, where we are doing this request -> response call anyway. This gives the exact endpoint where a publish should happen. Likewise for the subscribe URL - it tells the gateway where to pull the results.

We could skip this entirely and hard-code the URLs via well-known paths, distinguish jobs via ids in HTTP headers, etc but this is an easy way for us to add a bit of topological flexibility without breaking the protocol later (eg, routing the stream to a different machine).

In fact from some of the conversations on Discord right now, there is probably another way to make this even more robust: to get a list of subscribe URLs (think multiple renditions of low latency video transcoding)

@j0sh
Copy link
Collaborator Author

j0sh commented Oct 21, 2024

Force-pushed to fix merge conflicts from the ai-video rebase

is it possible to actually run the code from this PR? If yes, could you describe how to run it locally?

Absolutely! The big thing is making sure you have MediaMTX running. This is pretty simple - it is a single executable + config file. Download a pre-built release and use this config file [1]. Stick the executable in the same directory as the config and run ./mediamtx

Then for the gateway:

./livepeer -gateway -rtmpAddr :1936 -httpAddr :5936 -orchAddr localhost:8935

For the orchestrator + worker:

./livepeer -orchestrator -aiWorker -aiModels 'live-video-to-video:stream-diffusion:false' -serviceAddr localhost:8935 -transcoder

Publish to http://<mediamtx-host>:8889/streamname/publish ; see more details in livepeer/ai-worker#209

I still don't understand how we plan it all to work

That's fair; as mentioned earlier I think that is partially a result of recent pressure to deliver things without enough time to fully design an end-to-end flow within go-livepeer.

BTW, I am afraid that this PR is probably not quite enough for you to base your work on just yet, if the plan is still to carry payments within media stream. The trickle server on the orchestrator only behaves as a simple pipe between publisher and subscriber, and does not have any mechanisms (yet) to execute additional code based on incoming segments, eg for us to process PM tickets or record metrics for selection, and we need to adjust the publisher API to also include custom headers. I'll have those in within the next day or so.

NB: One elephant in the room here is "how do we integrate mediamtx into our infrastructure" and I will spin up a separate thread to discuss that.

[1] The only difference between this config file and the MediaMTX sample config is in the addition of a runOnReady curl hook, and enabling STUN support for WebRTC (because my cloud dev box doesn't work without it; prod might vary). Diff below:

diff --git b/mediamtx.yml a/mediamtx.yml
index c3aed76..cf7c60c 100644
--- b/mediamtx.yml
+++ a/mediamtx.yml
@@ -376,8 +376,8 @@ webrtcAdditionalHosts: []
 # ICE servers. Needed only when local listeners can't be reached by clients.
 # STUN servers allows to obtain and share the public IP of the server.
 # TURN/TURNS servers forces all traffic through them.
-webrtcICEServers2: []
-  # - url: stun:stun.l.google.com:19302
+webrtcICEServers2:
+  - url: stun:stun.l.google.com:19302
   # if user is "AUTH_SECRET", then authentication is secret based.
   # the secret must be inserted into the password field.
   # username: ''
@@ -643,7 +643,7 @@ pathDefaults:
   #   a regular expression.
   # * MTX_SOURCE_TYPE: source type
   # * MTX_SOURCE_ID: source ID
-  runOnReady:
+  runOnReady: curl http://localhost:5936/live-video-start -F stream=$MTX_PATH
   # Restart the command if it exits.
   runOnReadyRestart: no
   # Command to run when the stream is not available anymore.

@leszko
Copy link
Contributor

leszko commented Oct 22, 2024

Absolutely! The big thing is making sure you have MediaMTX running. This is pretty simple - it is a single executable +
Thanks. I think I have it working 🙏

}

func (sm *StreamManager) handlePost(w http.ResponseWriter, r *http.Request) {
stream := sm.getOrCreateStream(r.PathValue("streamName"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC here we'll need to handle payment (or at least check have some check if the segment is covered). Is it correct @j0sh ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly yes, but not in this exact spot because not all streams will have payments, such as the result stream.

I can add some middleware for the O to handle incoming segments and you can add in payments from there, eg by modifying outgoing headers. But if you are thinking about an out-of-band payments mechanism then the middleware can probably be skipped for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, even if we do the payment outside of this flow, then we'll need to have somewhere the check "if it's paid" in order to stop sending segments if no payment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep - payments outside the flow can be a little simpler, eg a local (in-memory) subscriber rather than middleware that modifies the stream. Then you can gather any needed metrics within the subscribe callback and use those to check payments.

BTW if we do payments by "time" then it might not even be needed!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants