Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a shared semaphore implementation #368

Merged
merged 1 commit into from
Nov 24, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 153 additions & 1 deletion source/vibe/core/sync.d
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@ shared(ManualEvent) createSharedManualEvent()
return ret;
}

/** Creates a new semaphore object.

These implementations are a task/fiber compatible replacement for `core.sync.semaphore`.
*/
LocalTaskSemaphore createTaskSemaphore(int max_locks)
{
return new LocalTaskSemaphore(max_locks);
}
/// ditto
shared(TaskSemaphore) createSharedTaskSemaphore(int max_locks)
{
return new shared TaskSemaphore(max_locks);
}


/** Performs RAII based locking/unlocking of a mutex.

Expand Down Expand Up @@ -198,7 +212,6 @@ unittest {
});
}


/**
Thread-local semaphore implementation for tasks.

Expand Down Expand Up @@ -321,6 +334,145 @@ final class LocalTaskSemaphore
}
}

/**
Thread-local semaphore implementation for tasks.

When the semaphore runs out of concurrent locks, it will suspend. This class
is used in `vibe.core.connectionpool` to limit the number of concurrent
connections.
*/
final shared class TaskSemaphore
{
// requires a queue
import std.container.binaryheap;
import std.container.array;
//import vibe.utils.memory;

private {
static struct ThreadWaiter {
ubyte priority;
uint seq;
}

static struct State {
BinaryHeap!(Array!ThreadWaiter, asc) waiters;
uint maxLocks;
uint locks;
uint seq;
}

shared(TaskMutex) m_mutex;
shared(TaskCondition) m_condition;
shared(vibe.core.sync.Monitor!(State, shared(TaskMutex))) m_state;
}

@safe shared nothrow:

this(uint max_locks)
{
m_mutex = new shared TaskMutex;
static if (__VERSION__ >= 2093) {
m_condition = new shared TaskCondition(m_mutex);
} else {
m_condition = () @trusted { return cast(shared)new TaskCondition(cast()m_mutex); } ();
}
m_state = createMonitor!State(m_mutex);
scope st = m_state.lock;
st.maxLocks = max_locks;
}

/// Maximum number of concurrent locks
@property void maxLocks(uint max_locks) { m_state.lock.maxLocks = max_locks; }
/// ditto
@property uint maxLocks() const { return m_state.lock.maxLocks; }

/// Number of concurrent locks still available
@property uint available() const { scope st = m_state.lock; return st.maxLocks - st.locks; }

/** Try to acquire a lock.

If a lock cannot be acquired immediately, returns `false` and leaves the
semaphore in its previous state.

Returns:
`true` is returned $(I iff) the number of available locks is greater
than one.
*/
bool tryLock()
{
scope st = m_state.lock;
if (st.locks < st.maxLocks) {
st.locks++;
return true;
}
return false;
}

/** Acquires a lock.

Once the limit of concurrent locks is reached, this method will block
until the number of locks drops below the limit.
*/
void lock(ubyte priority = 0)
{
import std.algorithm.comparison : min;

if (tryLock())
return;

scope st = m_state.lock;

ThreadWaiter w;
w.priority = priority;
w.seq = min(0, st.seq - w.priority);
if (++st.seq == uint.max)
rewindSeq(st);

() @trusted { st.waiters.insert(w); } ();

while (true) {
m_condition.wait();
// NOTE: BinaryHeap.front is not nothrow on older compiler versions
try if (st.waiters.front.seq == w.seq && st.locks < st.maxLocks) {
st.locks++;
return;
}
catch (Exception e) assert(false, e.msg);
}
}

/** Gives up an existing lock.
*/
void unlock()
{
scope st = m_state.lock;
assert(st.locks >= 1);
st.locks--;
if (st.waiters.length > 0)
m_condition.notifyAll();
}

// if true, a goes after b. ie. b comes out front()
/// private
static bool asc(ref ThreadWaiter a, ref ThreadWaiter b)
{
if (a.priority != b.priority)
return a.priority < b.priority;
return a.seq > b.seq;
}

private void rewindSeq(ref typeof(m_state).Locked st)
@trusted {
Array!ThreadWaiter waiters = st.waiters.release();
ushort min_seq;
import std.algorithm : min;
foreach (ref waiter; waiters[])
min_seq = min(waiter.seq, min_seq);
foreach (ref waiter; waiters[])
waiter.seq -= min_seq;
st.waiters.assume(waiters);
}
}

/**
Mutex implementation for fibers.
Expand Down