From 07a084adcf75e3c5f3781240b7137eaef654b3f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Wed, 13 Dec 2023 16:52:23 +0100 Subject: [PATCH] Create a thread event object per ManualEvent thread waiter The idea behind using a shared per-thread event object for all ManualEvent instances was to save resources. However, the overhead can become quite excessive in situations where many ManualEvents are in a wait state, because all of them will be resumed whenever one of them gets emitted. Resource optimization should instead be done in eventcore where necessary. The free list of thread waiters is now stored in TLS memory, shared for all ManualEvents, instead of individually for each ManualEvent. --- source/vibe/core/core.d | 14 +++++--- source/vibe/core/sync.d | 78 ++++++++++++++++++++--------------------- 2 files changed, 47 insertions(+), 45 deletions(-) diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index 60f30114..74e67723 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -1845,6 +1845,8 @@ shared static this() shared static ~this() { + destroy(st_threadsSignal); + shutdownDriver(); size_t tasks_left = s_scheduler.scheduledTaskCount; @@ -1887,6 +1889,13 @@ static ~this() shutdownWorkerPool(); } + foreach (f; s_availableFibers) { + f.shutdown(); + destroy(f); + } + + ManualEvent.freeThreadResources(); + synchronized (st_threadsMutex) { auto idx = st_threads.countUntil!(c => c.thread is thisthr); assert(idx >= 0, "No more threads registered"); @@ -1927,11 +1936,6 @@ nothrow { private void shutdownDriver() { - if (ManualEvent.ms_threadEvent != EventID.init) { - eventDriver.events.releaseRef(ManualEvent.ms_threadEvent); - ManualEvent.ms_threadEvent = EventID.init; - } - static if (is(typeof(tryGetEventDriver()))) { // avoid creating an event driver on threads that don't actually have one if (auto drv = tryGetEventDriver()) diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index 26dc7d81..c2484569 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -1139,9 +1139,6 @@ struct LocalManualEvent { Waiter m_waiter; } - // thread destructor in vibe.core.core will decrement the ref. count - package static EventID ms_threadEvent; - private void initialize() nothrow { import vibe.internal.allocator : Mallocator, makeGCSafe; @@ -1345,14 +1342,11 @@ struct ManualEvent { int m_emitCount; static struct Waiters { StackSList!ThreadWaiter active; // actively waiting - StackSList!ThreadWaiter free; // free-list of reusable waiter structs } Monitor!(Waiters, shared(Mutex)) m_waiters; + static StackSList!ThreadWaiter s_free; // free-list of reusable waiter structs for the calling thread } - // thread destructor in vibe.core.core will decrement the ref. count - package static EventID ms_threadEvent; - enum EmitMode { single, all @@ -1365,6 +1359,15 @@ struct ManualEvent { m_waiters.initialize(new shared Mutex); } + package static void freeThreadResources() + { + s_free.filter((w) @trusted { + try destroy(w); + catch (Exception e) assert(false, e.msg); + return false; + }); + } + deprecated("ManualEvent is always non-null!") bool opCast (T : bool) () const shared nothrow { return true; } @@ -1483,11 +1486,6 @@ struct ManualEvent { () @trusted { logTrace("wait shared %s", cast(void*)&this); } (); - if (ms_threadEvent is EventID.invalid) { - ms_threadEvent = eventDriver.events.create(); - assert(ms_threadEvent != EventID.invalid, "Failed to create event!"); - } - MonoTime target_timeout, now; if (timeout != Duration.max) { try now = MonoTime.currTime(); @@ -1499,7 +1497,7 @@ struct ManualEvent { acquireThreadWaiter((scope ThreadWaiter w) { while (ec - emit_count <= 0) { - w.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, ms_threadEvent, () => (this.emitCount - emit_count) > 0); + w.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, w.m_event, () => (this.emitCount - emit_count) > 0); ec = this.emitCount; if (timeout != Duration.max) { @@ -1515,8 +1513,6 @@ struct ManualEvent { private void acquireThreadWaiter(DEL)(scope DEL del) shared { - import vibe.internal.allocator : processAllocator, makeGCSafe; - ThreadWaiter w; auto drv = eventDriver; @@ -1531,25 +1527,14 @@ struct ManualEvent { }); if (!w) { - free.filter((fw) { - if (fw.m_driver is drv) { - w = fw; - w.addRef(); - return false; - } - return true; - }); - - if (!w) { - () @trusted { - try { - w = processAllocator.makeGCSafe!ThreadWaiter; - w.m_driver = drv; - w.m_event = ms_threadEvent; - } catch (Exception e) { - assert(false, "Failed to allocate thread waiter."); - } - } (); + if (!s_free.empty) { + w = s_free.first; + s_free.remove(w); + assert(w.m_refCount == 0); + assert(w.m_driver is drv); + w.addRef(); + } else { + w = new ThreadWaiter; } assert(w.m_refCount == 1); @@ -1570,9 +1555,10 @@ struct ManualEvent { with (m_waiters.lock) { auto rmvd = active.remove(w); assert(rmvd, "Waiter not in active queue anymore!?"); - free.add(w); - // TODO: cap size of m_freeWaiters } + assert(w.m_refCount == 0); + s_free.add(w); + // TODO: cap size of m_freeWaiters } } } @@ -1761,7 +1747,7 @@ shared struct Monitor(T, M) private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) { - import vibe.internal.list : CircularDList; + import vibe.internal.list : CircularDList, StackSList; private { static struct TaskWaiter { @@ -1777,7 +1763,7 @@ private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) { } static if (EVENT_TRIGGERED) { - package(vibe) ThreadLocalWaiter next; // queue of other waiters of the same thread + package(vibe) ThreadLocalWaiter next; // queue of other waiters in the active/free list of the manual event NativeEventDriver m_driver; EventID m_event = EventID.invalid; } else { @@ -1792,6 +1778,11 @@ private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) { this() { m_waiters = CircularDList!(TaskWaiter*)(() @trusted { return &m_pivot; } ()); + static if (EVENT_TRIGGERED) { + m_driver = eventDriver; + m_event = m_driver.events.create(); + assert(m_event != EventID.invalid, "Failed to create event!"); + } } static if (EVENT_TRIGGERED) { @@ -1799,8 +1790,15 @@ private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) { { import vibe.core.internal.release : releaseHandle; - if (m_event != EventID.invalid) - releaseHandle!"events"(m_event, () @trusted { return cast(shared)m_driver; } ()); + if (m_event != EventID.invalid) { + import core.stdc.stdlib : abort; + if (m_driver !is eventDriver) { + abort(); + assert(false, "ThreadWaiter destroyed in foreign thread"); + } + m_driver.events.releaseRef(m_event); + m_event = EventID.invalid; + } } }