-
Notifications
You must be signed in to change notification settings - Fork 170
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Gateway publish to orchestrator. #3211
base: ja/add-mediamtx-listener
Are you sure you want to change the base?
Conversation
j0sh
commented
Oct 18, 2024
- Requires Scaffolding for realtime-to-realtime #3210
- Set up a trickle HTTP endpoints on the orchestrator (requires golang 1.22 for the new routes; will send a separate patch to bump the go.mod and do any CI adjustments)
- Pulls a RTMP stream from MediaMTX on the gateway when a new stream comes in
- Converts the RTMP into mpegts segments
- Publishes segments to the orchestrator via trickle HTTP
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work, I think this is a very important PR, because it describes the protocol between G<>O. I added 2 comments, because I think I still don't understand how we plan it all to work.
Other than that, @j0sh is it possible to actually run the code from this PR? If yes, could you describe how to run it locally? I think if I plan with it, I can have some more comments and understand if better.
// Kick off the RTMP pull and segmentation as soon as possible | ||
ssr := media.NewSwitchableSegmentReader() | ||
go func() { | ||
media.RunSegmentation("rtmp://localhost/"+streamName, ssr.Read) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, gateway initiates the RTMP pull, but the flow of media still goes from mediamtx -> gateway
I guess there could be a better distinction between "who initiates the pull" vis-a-vis the actual flow of media, but RTMP isn't really a request-response protocol in the same way the other HTTP flows are in this diagram. Open to suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can add one more arrow with "initiate RTMP" 🙃 For me it is/was pretty confusing.
u = sess.Transcoder() + u | ||
return url.Parse(u) | ||
} | ||
pub, err := appendHostname(resp.JSON200.PublishUrl) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need PublishUrl in Orchestrator? Isn't it Gateway Publishing to Orchestrator?
Yeah this was a bit of a spur of the moment addition when I was looking at the overall flow, where we are doing this request -> response call anyway. This gives the exact endpoint where a publish should happen. Likewise for the subscribe URL - it tells the gateway where to pull the results.
We could skip this entirely and hard-code the URLs via well-known paths, distinguish jobs via ids in HTTP headers, etc but this is an easy way for us to add a bit of topological flexibility without breaking the protocol later (eg, routing the stream to a different machine).
In fact from some of the conversations on Discord right now, there is probably another way to make this even more robust: to get a list of subscribe URLs (think multiple renditions of low latency video transcoding)
a92dacb
to
31077ff
Compare
110cbc6
to
737d637
Compare
Force-pushed to fix merge conflicts from the ai-video rebase
Absolutely! The big thing is making sure you have MediaMTX running. This is pretty simple - it is a single executable + config file. Download a pre-built release and use this config file [1]. Stick the executable in the same directory as the config and run Then for the gateway:
For the orchestrator + worker:
Publish to
That's fair; as mentioned earlier I think that is partially a result of recent pressure to deliver things without enough time to fully design an end-to-end flow within go-livepeer. BTW, I am afraid that this PR is probably not quite enough for you to base your work on just yet, if the plan is still to carry payments within media stream. The trickle server on the orchestrator only behaves as a simple pipe between publisher and subscriber, and does not have any mechanisms (yet) to execute additional code based on incoming segments, eg for us to process PM tickets or record metrics for selection, and we need to adjust the publisher API to also include custom headers. I'll have those in within the next day or so. NB: One elephant in the room here is "how do we integrate mediamtx into our infrastructure" and I will spin up a separate thread to discuss that. [1] The only difference between this config file and the MediaMTX sample config is in the addition of a runOnReady curl hook, and enabling STUN support for WebRTC (because my cloud dev box doesn't work without it; prod might vary). Diff below: diff --git b/mediamtx.yml a/mediamtx.yml
index c3aed76..cf7c60c 100644
--- b/mediamtx.yml
+++ a/mediamtx.yml
@@ -376,8 +376,8 @@ webrtcAdditionalHosts: []
# ICE servers. Needed only when local listeners can't be reached by clients.
# STUN servers allows to obtain and share the public IP of the server.
# TURN/TURNS servers forces all traffic through them.
-webrtcICEServers2: []
- # - url: stun:stun.l.google.com:19302
+webrtcICEServers2:
+ - url: stun:stun.l.google.com:19302
# if user is "AUTH_SECRET", then authentication is secret based.
# the secret must be inserted into the password field.
# username: ''
@@ -643,7 +643,7 @@ pathDefaults:
# a regular expression.
# * MTX_SOURCE_TYPE: source type
# * MTX_SOURCE_ID: source ID
- runOnReady:
+ runOnReady: curl http://localhost:5936/live-video-start -F stream=$MTX_PATH
# Restart the command if it exits.
runOnReadyRestart: no
# Command to run when the stream is not available anymore. |
|
} | ||
|
||
func (sm *StreamManager) handlePost(w http.ResponseWriter, r *http.Request) { | ||
stream := sm.getOrCreateStream(r.PathValue("streamName")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC here we'll need to handle payment (or at least check have some check if the segment is covered). Is it correct @j0sh ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly yes, but not in this exact spot because not all streams will have payments, such as the result stream.
I can add some middleware for the O to handle incoming segments and you can add in payments from there, eg by modifying outgoing headers. But if you are thinking about an out-of-band payments mechanism then the middleware can probably be skipped for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, even if we do the payment outside of this flow, then we'll need to have somewhere the check "if it's paid" in order to stop sending segments if no payment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep - payments outside the flow can be a little simpler, eg a local (in-memory) subscriber rather than middleware that modifies the stream. Then you can gather any needed metrics within the subscribe callback and use those to check payments.
BTW if we do payments by "time" then it might not even be needed!
9e461a7
to
e8f2568
Compare