Skip to content

Commit

Permalink
WIP multi thread worker usage
Browse files Browse the repository at this point in the history
  • Loading branch information
ibc committed Mar 21, 2024
1 parent d4c4dd6 commit 23cab16
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 3 deletions.
3 changes: 2 additions & 1 deletion node/src/test/test-Consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ afterEach(async () => {
}
});

test('transport.consume() succeeds', async () => {
console.log('REMOVE test.only');

Check failure on line 255 in node/src/test/test-Consumer.ts

View workflow job for this annotation

GitHub Actions / ci (macos-14, 20)

Unexpected console statement
test.only('transport.consume() succeeds', async () => {
const onObserverNewConsumer1 = jest.fn();

ctx.webRtcTransport2!.observer.once('newconsumer', onObserverNewConsumer1);
Expand Down
5 changes: 4 additions & 1 deletion worker/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,11 @@ fn main() {
.env("PYTHONPATH", &pythonpath)
.env("MEDIASOUP_OUT_DIR", &mediasoup_out_dir)
.env("MEDIASOUP_BUILDTYPE", build_type)
// Force forward slashes on Windows too, otherwise Meson thinks path is not absolute 🤷
// Force forward slashes on Windows too, otherwise Meson thinks path is
// not absolute 🤷.
.env("MEDIASOUP_INSTALL_DIR", &out_dir.replace('\\', "/"))
// In Rust we want to enable worker multi-thread usage.
.env("MEDIASOUP_ENABLE_MULTITHREAD", "true")
.spawn()
.expect("Failed to start")
.wait()
Expand Down
30 changes: 30 additions & 0 deletions worker/include/SCTP/UsrSctpChecker.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#ifndef MS_SCTP_USRSCTP_CHECKER_HPP
#define MS_SCTP_USRSCTP_CHECKER_HPP

#include "common.hpp"
#include "handles/TimerHandle.hpp"
#include <uv.h>

namespace SCTP
{
class UsrSctpChecker : public TimerHandle::Listener
{
public:
UsrSctpChecker();
~UsrSctpChecker() override;

public:
void Start();
void Stop();

/* Pure virtual methods inherited from TimerHandle::Listener. */
public:
void OnTimer(TimerHandle* timer) override;

private:
TimerHandle* timer{ nullptr };
uint64_t lastCalledAtMs{ 0u };
};
} // namespace SCTP

#endif
8 changes: 7 additions & 1 deletion worker/include/Worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
#include "RTC/Shared.hpp"
#include "RTC/WebRtcServer.hpp"
#include "handles/SignalHandle.hpp"
#include "handles/TimerHandle.hpp"
#include <flatbuffers/flatbuffer_builder.h>
#include <absl/container/flat_hash_map.h>
#include <string>

class Worker : public Channel::ChannelSocket::Listener,
public SignalHandle::Listener,
public RTC::Router::Listener
public RTC::Router::Listener,
public TimerHandle::Listener
{
public:
explicit Worker(Channel::ChannelSocket* channel);
Expand Down Expand Up @@ -49,6 +51,10 @@ class Worker : public Channel::ChannelSocket::Listener,
public:
RTC::WebRtcServer* OnRouterNeedWebRtcServer(RTC::Router* router, std::string& webRtcServerId) override;

/* Pure virtual methods inherited from TimerHandle::Listener. */
public:
void OnTimer(TimerHandle* timer) override;

private:
// Passed by argument.
Channel::ChannelSocket* channel{ nullptr };
Expand Down
7 changes: 7 additions & 0 deletions worker/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ common_sources = [
'src/RTC/RTCP/XR.cpp',
'src/RTC/RTCP/XrDelaySinceLastRr.cpp',
'src/RTC/RTCP/XrReceiverReferenceTime.cpp',
'src/SCTP/UsrSctpChecker.cpp',
]

openssl_proj = subproject(
Expand Down Expand Up @@ -298,6 +299,12 @@ if host_machine.system() == 'linux' and not get_option('ms_disable_liburing')
endif
endif

if get_option('ms_enable_multithread')
cpp_args += [
'-DMS_MULTITHREAD_ENABLED',
]
endif

libmediasoup_worker = library(
'libmediasoup-worker',
name_prefix: '',
Expand Down
1 change: 1 addition & 0 deletions worker/meson_options.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ option('ms_log_trace', type : 'boolean', value : false, description : 'When set
option('ms_log_file_line', type : 'boolean', value : false, description : 'When set to true, all the logging macros print more verbose information, including current file and line')
option('ms_rtc_logger_rtp', type : 'boolean', value : false, description : 'When set to true, prints a line with information for each RTP packet')
option('ms_disable_liburing', type : 'boolean', value : false, description : 'When set to true, disables liburing integration despite current host supports it')
option('ms_enable_multithread', type : 'boolean', value : false, description : 'When set to true, mediasoup worker is built assuming multi-thread usage')
74 changes: 74 additions & 0 deletions worker/src/SCTP/UsrSctpChecker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* NOTE: The UsrSctpChecker singleton must run in its own thread so it cannot
* use Logger.hpp since Logger communicates via UnixSocket with Node or Rust
* and there must be a single thread writing into that socket in worker side.
*/

#include "SCTP/UsrSctpChecker.hpp"
#include "DepLibUV.hpp"
#include <usrsctp.h>

namespace SCTP
{
/* Static. */

static constexpr size_t CheckerInterval{ 10u }; // In ms.

/* UsrSctpChecker instance methods. */

UsrSctpChecker::UsrSctpChecker()
{
this->timer = new TimerHandle(this);

DepLibUV::RunLoop();
}

UsrSctpChecker::~UsrSctpChecker()
{
delete this->timer;
}

void UsrSctpChecker::Start()
{
this->lastCalledAtMs = 0u;

this->timer->Start(CheckerInterval, CheckerInterval);
}

void UsrSctpChecker::Stop()
{
this->lastCalledAtMs = 0u;

this->timer->Stop();
}

void UsrSctpChecker::OnTimer(TimerHandle* /*timer*/)
{
auto nowMs = DepLibUV::GetTimeMs();
const int elapsedMs = this->lastCalledAtMs ? static_cast<int>(nowMs - this->lastCalledAtMs) : 0;

// TODO: This must run in the worker thread obviously. How3ver this is not easy
// at all. Note that usrsctp_handle_timers() may trigger many calls to the
// usrsctp onSendSctpData() callback, but each of those call may be intended
// for a SctpAssociation in whatever other worker/thread, so we cannot just
// try to group all them together.
// #ifdef MS_LIBURING_SUPPORTED
// Activate liburing usage.
// 'usrsctp_handle_timers()' will synchronously call the send/recv
// callbacks for the pending data. If there are multiple messages to be
// sent over the network then we will send those messages within a single
// system call.
// DepLibUring::SetActive();
// #endif

usrsctp_handle_timers(elapsedMs);

// TODO: This must run in the worker thread obviously.
// #ifdef MS_LIBURING_SUPPORTED
// Submit all prepared submission entries.
// DepLibUring::Submit();
// #endif

this->lastCalledAtMs = nowMs;
}
} // namespace SCTP
15 changes: 15 additions & 0 deletions worker/src/Worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
#include "FBS/response.h"
#include "FBS/worker.h"

// TODO: REMOVE
#include <fstream>

/* Instance methods. */

Worker::Worker(::Channel::ChannelSocket* channel) : channel(channel)
Expand Down Expand Up @@ -48,6 +51,9 @@ Worker::Worker(::Channel::ChannelSocket* channel) : channel(channel)
this->shared->channelNotifier->Emit(
std::to_string(Logger::Pid), FBS::Notification::Event::WORKER_RUNNING);

auto* timer = new TimerHandle(this);
// timer->Start(1000, 1000);

MS_DEBUG_DEV("starting libuv loop");
DepLibUV::RunLoop();
MS_DEBUG_DEV("libuv loop ended");
Expand Down Expand Up @@ -547,3 +553,12 @@ inline RTC::WebRtcServer* Worker::OnRouterNeedWebRtcServer(

return webRtcServer;
}

void Worker::OnTimer(TimerHandle* /*timer*/)
{
MS_DUMP_STD("---- Worker::OnTimer()");
std::ofstream outfile;
outfile.open("/tmp/ms_log.txt", std::ios_base::app);
outfile << "---- Worker::OnTimer()\n";
outfile.flush();
}
4 changes: 4 additions & 0 deletions worker/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ def setup(ctx, meson_args=MESON_ARGS):
"""
Run meson setup
"""
enable_multithread = os.getenv('MEDIASOUP_ENABLE_MULTITHREAD');
if enable_multithread:
meson_args += ' -Dms_enable_multithread=true';

if MEDIASOUP_BUILDTYPE == 'Release':
with ctx.cd(f'"{WORKER_DIR}"'):
ctx.run(
Expand Down

0 comments on commit 23cab16

Please sign in to comment.