Skip to content

Commit

Permalink
Merge pull request #370 from vibe-d/improvements
Browse files Browse the repository at this point in the history
Task/thread improvements
  • Loading branch information
l-kramer authored Dec 14, 2023
2 parents d9d82ce + 07a084a commit 83e8d29
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 52 deletions.
2 changes: 1 addition & 1 deletion dub.sdl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ authors "Sönke Ludwig"
copyright "Copyright © 2016-2020, Sönke Ludwig"
license "MIT"

dependency "eventcore" version="~>0.9.17"
dependency "eventcore" version="~>0.9.27"
dependency "vibe-container" version="~>1.0"

targetName "vibe_core"
Expand Down
25 changes: 24 additions & 1 deletion source/vibe/core/channel.d
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ enum ChannelPriority {
struct Channel(T, size_t buffer_size = 100) {
enum bufferSize = buffer_size;

private shared ChannelImpl!(T, buffer_size) m_impl;
private shared(ChannelImpl!(T, buffer_size)) m_impl;

this(this) @safe { if (m_impl) m_impl.addRef(); }
~this() @safe { if (m_impl) m_impl.releaseRef(); }

/** Determines whether there is more data to read in a single-reader scenario.
Expand Down Expand Up @@ -160,6 +163,7 @@ private final class ChannelImpl(T, size_t buffer_size) {
FixedRingBuffer!(T, buffer_size) m_items;
bool m_closed = false;
ChannelConfig m_config;
int m_refCount = 1;
}

this(ChannelConfig config)
Expand All @@ -169,6 +173,25 @@ private final class ChannelImpl(T, size_t buffer_size) {
m_config = config;
}

private void addRef()
@safe nothrow shared {
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
thisus.m_refCount++;
}

private void releaseRef()
@safe nothrow shared {
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
if (--thisus.m_refCount == 0) {
try () @trusted { destroy(m_condition); } ();
catch (Exception e) assert(false);
}
}

@property bool empty()
shared nothrow {
{
Expand Down
18 changes: 11 additions & 7 deletions source/vibe/core/core.d
Original file line number Diff line number Diff line change
Expand Up @@ -773,10 +773,10 @@ public void setupWorkerThreads(uint num = 0)
scope (exit) st_threadsMutex.unlock_nothrow();

if (!st_workerPool)
st_workerPool = new shared TaskPool(num);
st_workerPool = new shared TaskPool(num, "vibe-worker");

if (!st_ioWorkerPool)
st_ioWorkerPool = new shared TaskPool(3);
st_ioWorkerPool = new shared TaskPool(3, "vibe-io");
} ();
}

Expand Down Expand Up @@ -1845,6 +1845,8 @@ shared static this()

shared static ~this()
{
destroy(st_threadsSignal);

shutdownDriver();

size_t tasks_left = s_scheduler.scheduledTaskCount;
Expand Down Expand Up @@ -1887,6 +1889,13 @@ static ~this()
shutdownWorkerPool();
}

foreach (f; s_availableFibers) {
f.shutdown();
destroy(f);
}

ManualEvent.freeThreadResources();

synchronized (st_threadsMutex) {
auto idx = st_threads.countUntil!(c => c.thread is thisthr);
assert(idx >= 0, "No more threads registered");
Expand Down Expand Up @@ -1927,11 +1936,6 @@ nothrow {

private void shutdownDriver()
{
if (ManualEvent.ms_threadEvent != EventID.init) {
eventDriver.events.releaseRef(ManualEvent.ms_threadEvent);
ManualEvent.ms_threadEvent = EventID.init;
}

static if (is(typeof(tryGetEventDriver()))) {
// avoid creating an event driver on threads that don't actually have one
if (auto drv = tryGetEventDriver())
Expand Down
78 changes: 38 additions & 40 deletions source/vibe/core/sync.d
Original file line number Diff line number Diff line change
Expand Up @@ -1139,9 +1139,6 @@ struct LocalManualEvent {
Waiter m_waiter;
}

// thread destructor in vibe.core.core will decrement the ref. count
package static EventID ms_threadEvent;

private void initialize()
nothrow {
import vibe.internal.allocator : Mallocator, makeGCSafe;
Expand Down Expand Up @@ -1345,14 +1342,11 @@ struct ManualEvent {
int m_emitCount;
static struct Waiters {
StackSList!ThreadWaiter active; // actively waiting
StackSList!ThreadWaiter free; // free-list of reusable waiter structs
}
Monitor!(Waiters, shared(Mutex)) m_waiters;
static StackSList!ThreadWaiter s_free; // free-list of reusable waiter structs for the calling thread
}

// thread destructor in vibe.core.core will decrement the ref. count
package static EventID ms_threadEvent;

enum EmitMode {
single,
all
Expand All @@ -1365,6 +1359,15 @@ struct ManualEvent {
m_waiters.initialize(new shared Mutex);
}

package static void freeThreadResources()
{
s_free.filter((w) @trusted {
try destroy(w);
catch (Exception e) assert(false, e.msg);
return false;
});
}

deprecated("ManualEvent is always non-null!")
bool opCast (T : bool) () const shared nothrow { return true; }

Expand Down Expand Up @@ -1483,11 +1486,6 @@ struct ManualEvent {

() @trusted { logTrace("wait shared %s", cast(void*)&this); } ();

if (ms_threadEvent is EventID.invalid) {
ms_threadEvent = eventDriver.events.create();
assert(ms_threadEvent != EventID.invalid, "Failed to create event!");
}

MonoTime target_timeout, now;
if (timeout != Duration.max) {
try now = MonoTime.currTime();
Expand All @@ -1499,7 +1497,7 @@ struct ManualEvent {

acquireThreadWaiter((scope ThreadWaiter w) {
while (ec - emit_count <= 0) {
w.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, ms_threadEvent, () => (this.emitCount - emit_count) > 0);
w.wait!interruptible(timeout != Duration.max ? target_timeout - now : Duration.max, w.m_event, () => (this.emitCount - emit_count) > 0);
ec = this.emitCount;

if (timeout != Duration.max) {
Expand All @@ -1515,8 +1513,6 @@ struct ManualEvent {

private void acquireThreadWaiter(DEL)(scope DEL del)
shared {
import vibe.internal.allocator : processAllocator, makeGCSafe;

ThreadWaiter w;
auto drv = eventDriver;

Expand All @@ -1531,25 +1527,14 @@ struct ManualEvent {
});

if (!w) {
free.filter((fw) {
if (fw.m_driver is drv) {
w = fw;
w.addRef();
return false;
}
return true;
});

if (!w) {
() @trusted {
try {
w = processAllocator.makeGCSafe!ThreadWaiter;
w.m_driver = drv;
w.m_event = ms_threadEvent;
} catch (Exception e) {
assert(false, "Failed to allocate thread waiter.");
}
} ();
if (!s_free.empty) {
w = s_free.first;
s_free.remove(w);
assert(w.m_refCount == 0);
assert(w.m_driver is drv);
w.addRef();
} else {
w = new ThreadWaiter;
}

assert(w.m_refCount == 1);
Expand All @@ -1570,9 +1555,10 @@ struct ManualEvent {
with (m_waiters.lock) {
auto rmvd = active.remove(w);
assert(rmvd, "Waiter not in active queue anymore!?");
free.add(w);
// TODO: cap size of m_freeWaiters
}
assert(w.m_refCount == 0);
s_free.add(w);
// TODO: cap size of m_freeWaiters
}
}
}
Expand Down Expand Up @@ -1761,7 +1747,7 @@ shared struct Monitor(T, M)


private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) {
import vibe.internal.list : CircularDList;
import vibe.internal.list : CircularDList, StackSList;

private {
static struct TaskWaiter {
Expand All @@ -1777,7 +1763,7 @@ private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) {
}

static if (EVENT_TRIGGERED) {
package(vibe) ThreadLocalWaiter next; // queue of other waiters of the same thread
package(vibe) ThreadLocalWaiter next; // queue of other waiters in the active/free list of the manual event
NativeEventDriver m_driver;
EventID m_event = EventID.invalid;
} else {
Expand All @@ -1792,15 +1778,27 @@ private final class ThreadLocalWaiter(bool EVENT_TRIGGERED) {
this()
{
m_waiters = CircularDList!(TaskWaiter*)(() @trusted { return &m_pivot; } ());
static if (EVENT_TRIGGERED) {
m_driver = eventDriver;
m_event = m_driver.events.create();
assert(m_event != EventID.invalid, "Failed to create event!");
}
}

static if (EVENT_TRIGGERED) {
~this()
{
import vibe.core.internal.release : releaseHandle;

if (m_event != EventID.invalid)
releaseHandle!"events"(m_event, () @trusted { return cast(shared)m_driver; } ());
if (m_event != EventID.invalid) {
import core.stdc.stdlib : abort;
if (m_driver !is eventDriver) {
abort();
assert(false, "ThreadWaiter destroyed in foreign thread");
}
m_driver.events.releaseRef(m_event);
m_event = EventID.invalid;
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions source/vibe/core/task.d
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ enum TaskEvent {
preStart, /// Just about to invoke the fiber which starts execution
postStart, /// After the fiber has returned for the first time (by yield or exit)
start, /// Just about to start execution
schedule, /// Scheduled for execution
yield, /// Temporarily paused
resume, /// Resumed from a prior yield
end, /// Ended normally
Expand Down Expand Up @@ -999,6 +1000,8 @@ package struct TaskScheduler {

if (t == thist) return;

debug if (TaskFiber.ms_taskEventCallback) () @trusted { TaskFiber.ms_taskEventCallback(TaskEvent.schedule, t); } ();

auto thisthr = thist ? thist.thread : () @trusted { return Thread.getThis(); } ();
assert(t.thread is thisthr, "Cannot switch to a task that lives in a different thread.");

Expand Down
7 changes: 5 additions & 2 deletions source/vibe/core/taskpool.d
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ shared final class TaskPool {
Params:
thread_count = The number of worker threads to create
*/
this(size_t thread_count = logicalProcessorCount())
this(size_t thread_count = logicalProcessorCount(), string thread_name_prefix = "vibe")
@safe nothrow {
import std.format : format;

Expand All @@ -52,7 +52,7 @@ shared final class TaskPool {
WorkerThread thr;
() @trusted nothrow {
thr = new WorkerThread(this);
try thr.name = format("vibe-%s", i);
try thr.name = format("%s-%s", thread_name_prefix, i);
catch (Exception e) logException(e, "Failed to set worker thread name");
thr.start();
} ();
Expand Down Expand Up @@ -95,6 +95,8 @@ shared final class TaskPool {

size_t cnt = m_state.lock.queue.length;
if (cnt > 0) logWarn("There were still %d worker tasks pending at exit.", cnt);

destroy(m_signal);
}

/** Instructs all worker threads to terminate as soon as all tasks have
Expand Down Expand Up @@ -209,6 +211,7 @@ shared final class TaskPool {
static void taskFun(Channel!Task ch, FT func, ARGS args) {
try ch.put(Task.getThis());
catch (Exception e) assert(false, e.msg);
ch = Channel!Task.init;
mixin(callWithMove!ARGS("func", "args"));
}
runTask_unsafe(settings, &taskFun, ch, func, args);
Expand Down
4 changes: 3 additions & 1 deletion source/vibe/internal/interfaceproxy.d
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ struct InterfaceProxy(I) if (is(I == interface)) {
foreach (idx, F; Overloads) {
enum attribs = functionAttributeString!F(false);
enum is_prop = functionAttributes!F & FunctionAttribute.property;
ret ~= attribs~" ReturnType!(Overloads["~idx.stringof~"]) "~member~"("~parameterDecls!(F, idx)~") { return m_intf."~member~"(m_value, "~parameterNames!F~"); }";
ret ~= attribs~" ReturnType!(Overloads["~idx.stringof~"]) "~member~"("~parameterDecls!(F, idx)~")"
~ "{ assert(!!m_intf, \"Accessing null \"~I.stringof~\" interface proxy\");"
~ "return m_intf."~member~"(m_value, "~parameterNames!F~"); }";
}
return ret;
}
Expand Down

0 comments on commit 83e8d29

Please sign in to comment.