Skip to content

Commit

Permalink
Fix and streamline semaphore behavior.
Browse files Browse the repository at this point in the history
s-ludwig committed Nov 24, 2023
1 parent 7d17ce6 commit 1624d9b
Showing 1 changed file with 104 additions and 33 deletions.
137 changes: 104 additions & 33 deletions source/vibe/core/sync.d
Original file line number Diff line number Diff line change
@@ -221,27 +221,27 @@ unittest {
*/
final class LocalTaskSemaphore
{
@safe:

// requires a queue
import std.container.binaryheap;
import std.container.binaryheap; // priority queue
import std.container.array;
//import vibe.utils.memory;

private {
static struct ThreadWaiter {
ubyte priority;
byte priority;
uint seq;
ulong id;
}

BinaryHeap!(Array!ThreadWaiter, asc) m_waiters;
uint m_maxLocks;
uint m_locks;
uint m_seq;
ulong m_idCounter;
LocalManualEvent m_signal;
}

this(uint max_locks) nothrow
@safe nothrow:

this(uint max_locks)
{
m_maxLocks = max_locks;
m_signal = createManualEvent();
@@ -250,7 +250,7 @@ final class LocalTaskSemaphore
/// Maximum number of concurrent locks
@property void maxLocks(uint max_locks) { m_maxLocks = max_locks; }
/// ditto
@property uint maxLocks() const nothrow { return m_maxLocks; }
@property uint maxLocks() const { return m_maxLocks; }

/// Number of concurrent locks still available
@property uint available() const { return m_maxLocks - m_locks; }
@@ -266,8 +266,7 @@ final class LocalTaskSemaphore
*/
bool tryLock()
{
if (available > 0)
{
if (available > 0) {
m_locks++;
return true;
}
@@ -278,38 +277,52 @@ final class LocalTaskSemaphore
Once the limit of concurrent locks is reached, this method will block
until the number of locks drops below the limit.
Params:
priority = Optional priority modifier - any lock requests with a
higher priority will be served before all requests with a lower
priority, FIFO order is applied within a priority class.
*/
void lock(ubyte priority = 0)
void lock(byte priority = 0)
{
import std.algorithm.comparison : min;

if (tryLock())
return;

auto ec = m_signal.emitCount;

if (m_seq == uint.max) {
rewindSeq();
assert(m_seq != uint.max, "Semaphore queue overflow");
}

ThreadWaiter w;
w.priority = priority;
w.seq = min(0, m_seq - w.priority);
if (++m_seq == uint.max)
rewindSeq();
w.seq = m_seq++;
w.id = m_idCounter++;

() @trusted { m_waiters.insert(w); } ();

while (true) {
m_signal.waitUninterruptible();
if (m_waiters.front.seq == w.seq && tryLock()) {
ec = m_signal.waitUninterruptible(ec);
// NOTE: BinaryHeap.front is not nothrow on older compiler versions
try if (m_waiters.front.id == w.id && tryLock()) {
() @trusted { m_waiters.removeFront(); } ();
return;
}
catch (Exception e) assert(false, e.msg);
}
}

/** Gives up an existing lock.
*/
void unlock()
{
assert(m_locks >= 1);
assert(m_locks >= 1, "Unlocking semaphore with no active locks");
m_locks--;
if (m_waiters.length > 0)
m_signal.emit(); // resume one
m_signal.emit(); // notify waiters so that the next one can resume
}

// if true, a goes after b. ie. b comes out front()
@@ -323,13 +336,15 @@ final class LocalTaskSemaphore

private void rewindSeq()
@trusted {
Array!ThreadWaiter waiters = m_waiters.release();
ushort min_seq;
import std.algorithm : min;

Array!ThreadWaiter waiters = m_waiters.release();
auto min_seq = m_seq;
foreach (ref waiter; waiters[])
min_seq = min(waiter.seq, min_seq);
foreach (ref waiter; waiters[])
waiter.seq -= min_seq;
m_seq -= min_seq;
m_waiters.assume(waiters);
}
}
@@ -343,22 +358,22 @@ final class LocalTaskSemaphore
*/
final shared class TaskSemaphore
{
// requires a queue
import std.container.binaryheap;
import std.container.binaryheap; // priority queue
import std.container.array;
//import vibe.utils.memory;

private {
static struct ThreadWaiter {
ubyte priority;
byte priority;
uint seq;
ulong id;
}

static struct State {
BinaryHeap!(Array!ThreadWaiter, asc) waiters;
uint maxLocks;
uint locks;
uint seq;
ulong idCounter;
}

shared(TaskMutex) m_mutex;
@@ -412,8 +427,13 @@ final shared class TaskSemaphore
Once the limit of concurrent locks is reached, this method will block
until the number of locks drops below the limit.
Params:
priority = Optional priority modifier - any lock requests with a
higher priority will be served before all requests with a lower
priority, FIFO order is applied within a priority class.
*/
void lock(ubyte priority = 0)
void lock(byte priority = 0)
{
import std.algorithm.comparison : min;

@@ -422,18 +442,23 @@ final shared class TaskSemaphore

scope st = m_state.lock;

if (st.seq == uint.max) {
rewindSeq(st);
assert(st.seq != uint.max, "Semaphore queue overflow");
}

ThreadWaiter w;
w.priority = priority;
w.seq = min(0, st.seq - w.priority);
if (++st.seq == uint.max)
rewindSeq(st);
w.seq = st.seq++;
w.id = st.idCounter++;

() @trusted { st.waiters.insert(w); } ();

while (true) {
m_condition.wait();
// NOTE: BinaryHeap.front is not nothrow on older compiler versions
try if (st.waiters.front.seq == w.seq && st.locks < st.maxLocks) {
try if (st.waiters.front.id == w.id && st.locks < st.maxLocks) {
() @trusted { st.waiters.removeFront(); } ();
st.locks++;
return;
}
@@ -446,7 +471,7 @@ final shared class TaskSemaphore
void unlock()
{
scope st = m_state.lock;
assert(st.locks >= 1);
assert(st.locks >= 1, "Unlocking semaphore with no active locks");
st.locks--;
if (st.waiters.length > 0)
m_condition.notifyAll();
@@ -461,19 +486,65 @@ final shared class TaskSemaphore
return a.seq > b.seq;
}

private void rewindSeq(ref typeof(m_state).Locked st)
private void rewindSeq(scope ref typeof(m_state).Locked st)
@trusted {
Array!ThreadWaiter waiters = st.waiters.release();
ushort min_seq;
import std.algorithm : min;

Array!ThreadWaiter waiters = st.waiters.release();
auto min_seq = st.seq;
foreach (ref waiter; waiters[])
min_seq = min(waiter.seq, min_seq);
foreach (ref waiter; waiters[])
waiter.seq -= min_seq;
st.seq -= min_seq;
st.waiters.assume(waiters);
}
}

unittest {
import vibe.core.core : runTask, sleep;

void test(S)(S sem)
{
assert(sem.available == 2);
assert(sem.tryLock());
assert(sem.available == 1);
assert(sem.tryLock());
assert(sem.available == 0);

int seq = 0;
auto t1 = runTask({
sem.lock(0);
assert(seq++ == 2);
sem.lock(2);
assert(seq++ == 3);
sem.lock(-1);
assert(seq++ == 6);
});
auto t2 = runTask({
sem.lock(1);
assert(seq++ == 0);
sem.lock(1);
assert(seq++ == 1);
sem.lock(-1);
assert(seq++ == 4);
sem.lock(0);
assert(seq++ == 5);
});
foreach (i; 0 .. 9) {
sleep(10.msecs);
sem.unlock();
}
assert(sem.available == 2);
t1.join();
t2.join();
}

test(createTaskSemaphore(2));
test(createSharedTaskSemaphore(2));
}


/**
Mutex implementation for fibers.

0 comments on commit 1624d9b

Please sign in to comment.