Skip to content

Commit

Permalink
Create a thread event object per ManualEvent thread waiter
Browse files Browse the repository at this point in the history
The idea behind using a shared per-thread event object for all ManualEvent instances was to save resources. However, the overhead can become quite excessive in situations where many ManualEvents are in a wait state, because all of them will be resumed whenever one of them gets emitted.

Resource optimization should instead be done in eventcore where necessary.

The free list of thread waiters is now stored in TLS memory, shared for all ManualEvents, instead of individually for each ManualEvent.
  • Loading branch information
s-ludwig committed Dec 14, 2023
1 parent d5436a4 commit 07a084a
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 45 deletions.
14 changes: 9 additions & 5 deletions source/vibe/core/core.d
Original file line number Diff line number Diff line change
Expand Up @@ -1845,6 +1845,8 @@ shared static this()

shared static ~this()
{
destroy(st_threadsSignal);

shutdownDriver();

size_t tasks_left = s_scheduler.scheduledTaskCount;
Expand Down Expand Up @@ -1887,6 +1889,13 @@ static ~this()
shutdownWorkerPool();
}

foreach (f; s_availableFibers) {
f.shutdown();
destroy(f);
}

ManualEvent.freeThreadResources();

synchronized (st_threadsMutex) {
auto idx = st_threads.countUntil!(c => c.thread is thisthr);
assert(idx >= 0, "No more threads registered");
Expand Down Expand Up @@ -1927,11 +1936,6 @@ nothrow {

private void shutdownDriver()
{
if (ManualEvent.ms_threadEvent != EventID.init) {
eventDriver.events.releaseRef(ManualEvent.ms_threadEvent);
ManualEvent.ms_threadEvent = EventID.init;
}

static if (is(typeof(tryGetEventDriver()))) {
// avoid creating an event driver on threads that don't actually have one
if (auto drv = tryGetEventDriver())
Expand Down
78 changes: 38 additions & 40 deletions source/vibe/core/sync.d
Original file line number Diff line number Diff line change
Expand Up @@ -1139,9 +1139,6 @@ struct LocalManualEvent {
Waiter m_waiter;
}

// thread destructor in vibe.core.core will decrement the ref. count
package static EventID ms_threadEvent;

private void initialize()
nothrow {
import vibe.internal.allocator : Mallocator, makeGCSafe;
Expand Down Expand Up @@ -1345,14 +1342,11 @@ struct ManualEvent {
int m_emitCount;
static struct Waiters {
StackSList!ThreadWaiter active; // actively waiting
StackSList!ThreadWaiter free; // free-list of reusable waiter structs
}
Monitor!(Waiters, shared(Mutex)) m_waiters;
static StackSList!ThreadWaiter s_free; // free-list of reusable waiter structs for the calling thread
}

// thread destructor in vibe.core.core will decrement the ref. count
package static EventID ms_threadEvent;

enum EmitMode {
single,
all
Expand All @@ -1365,6 +1359,15 @@ struct ManualEvent {
m_waiters.initialize(new shared Mutex);
}

package static void freeThreadResources()
{
s_free.filter((w) @trusted {
try destroy(w);
catch (Exception e) assert(false, e.msg);
return false;
});
}

deprecated("ManualEvent is always non-null!")
bool opCast (T : bool) () const shared nothrow { return true; }

Expand Down Expand Up @@ -1483,11 +1486,6 @@ struct ManualEvent {

() @trusted { logTrace("wait shared %s", cast(void*)&this); } ();

if (ms_threadEvent is EventID.invalid) {
ms_threadEvent = eventDriver.events.create();
assert(ms_threadEvent != EventID.invalid, "Failed to create event!");
}

MonoTime target_timeout, now;
if (timeout != Duration.max) {
try now = MonoTime.currTime();
Expand All @@ -1499,7 +1497,7 @@ struct ManualEvent {

acquireThreadWaiter((scope ThreadWaiter w) {
while (ec - emit_count <= 0) {
w.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, ms_threadEvent, () => (this.emitCount - emit_count) > 0);
w.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, w.m_event, () => (this.emitCount - emit_count) > 0);
ec = this.emitCount;

if (timeout != Duration.max) {
Expand All @@ -1515,8 +1513,6 @@ struct ManualEvent {

private void acquireThreadWaiter(DEL)(scope DEL del)
shared {
import vibe.internal.allocator : processAllocator, makeGCSafe;

ThreadWaiter w;
auto drv = eventDriver;

Expand All @@ -1531,25 +1527,14 @@ struct ManualEvent {
});

if (!w) {
free.filter((fw) {
if (fw.m_driver is drv) {
w = fw;
w.addRef();
return false;
}
return true;
});

if (!w) {
() @trusted {
try {
w = processAllocator.makeGCSafe!ThreadWaiter;
w.m_driver = drv;
w.m_event = ms_threadEvent;
} catch (Exception e) {
assert(false, "Failed to allocate thread waiter.");
}
} ();
if (!s_free.empty) {
w = s_free.first;
s_free.remove(w);
assert(w.m_refCount == 0);
assert(w.m_driver is drv);
w.addRef();
} else {
w = new ThreadWaiter;
}

assert(w.m_refCount == 1);
Expand All @@ -1570,9 +1555,10 @@ struct ManualEvent {
with (m_waiters.lock) {
auto rmvd = active.remove(w);
assert(rmvd, "Waiter not in active queue anymore!?");
free.add(w);
// TODO: cap size of m_freeWaiters
}
assert(w.m_refCount == 0);
s_free.add(w);
// TODO: cap size of m_freeWaiters
}
}
}
Expand Down Expand Up @@ -1761,7 +1747,7 @@ shared struct Monitor(T, M)


private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) {
import vibe.internal.list : CircularDList;
import vibe.internal.list : CircularDList, StackSList;

private {
static struct TaskWaiter {
Expand All @@ -1777,7 +1763,7 @@ private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) {
}

static if (EVENT_TRIGGERED) {
package(vibe) ThreadLocalWaiter next; // queue of other waiters of the same thread
package(vibe) ThreadLocalWaiter next; // queue of other waiters in the active/free list of the manual event
NativeEventDriver m_driver;
EventID m_event = EventID.invalid;
} else {
Expand All @@ -1792,15 +1778,27 @@ private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) {
this()
{
m_waiters = CircularDList!(TaskWaiter*)(() @trusted { return &m_pivot; } ());
static if (EVENT_TRIGGERED) {
m_driver = eventDriver;
m_event = m_driver.events.create();
assert(m_event != EventID.invalid, "Failed to create event!");
}
}

static if (EVENT_TRIGGERED) {
~this()
{
import vibe.core.internal.release : releaseHandle;

if (m_event != EventID.invalid)
releaseHandle!"events"(m_event, () @trusted { return cast(shared)m_driver; } ());
if (m_event != EventID.invalid) {
import core.stdc.stdlib : abort;
if (m_driver !is eventDriver) {
abort();
assert(false, "ThreadWaiter destroyed in foreign thread");
}
m_driver.events.releaseRef(m_event);
m_event = EventID.invalid;
}
}
}

Expand Down

0 comments on commit 07a084a

Please sign in to comment.