Skip to content

Commit

Permalink
LibPthread: Reimplement semaphores
Browse files Browse the repository at this point in the history
This implementation does not use locking or condition variables
internally; it's purely based on atomics and futexes.

Notably, concurrent sem_wait() and sem_post() calls can run *completely
in parallel* without slowing each other down, as long as there are empty
slots for them all to succeed without blocking.

Additionally, sem_wait() never executes an atomic operation with release
ordering, and sem_post() never executes an atomic operation with acquire
ordering (unless you count the syscall). This means the compiler and the
hardware are free to reorder code *into* the critical section.
  • Loading branch information
bugaevc committed Jul 5, 2021
1 parent 73c05fc commit 51d5f56
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 126 deletions.
222 changes: 99 additions & 123 deletions Userland/Libraries/LibPthread/semaphore.cpp
Original file line number Diff line number Diff line change
@@ -1,55 +1,37 @@
/*
* Copyright (c) 2021, the SerenityOS developers.
* Copyright (c) 2021, Gunnar Beutner <[email protected]>
* Copyright (c) 2021, Sergey Bugaev <[email protected]>
*
* SPDX-License-Identifier: BSD-2-Clause
*/

#include <AK/Assertions.h>
#include <AK/Atomic.h>
#include <AK/Types.h>
#include <errno.h>
#include <semaphore.h>
#include <serenity.h>

int sem_close(sem_t*)
// Whether sem_wait() or sem_post() is responsible for waking any sleeping
// threads.
static constexpr u32 POST_WAKES = 1 << 31;

sem_t* sem_open(const char*, int, ...)
{
errno = ENOSYS;
return -1;
return nullptr;
}

int sem_destroy(sem_t* sem)
int sem_close(sem_t*)
{
auto rc = pthread_mutex_destroy(&sem->mtx);
if (rc != 0) {
errno = rc;
return -1;
}

rc = pthread_cond_destroy(&sem->cv);
if (rc != 0) {
errno = rc;
return -1;
}

return 0;
errno = ENOSYS;
return -1;
}

int sem_getvalue(sem_t* sem, int* sval)
int sem_unlink(const char*)
{
auto rc = pthread_mutex_trylock(&sem->mtx);

if (rc == EBUSY) {
*sval = 0;
return 0;
}

if (rc != 0) {
errno = rc;
return -1;
}

*sval = sem->value;

pthread_mutex_unlock(&sem->mtx);

return 0;
errno = ENOSYS;
return -1;
}

int sem_init(sem_t* sem, int shared, unsigned int value)
Expand All @@ -64,116 +46,110 @@ int sem_init(sem_t* sem, int shared, unsigned int value)
return -1;
}

auto rc = pthread_mutex_init(&sem->mtx, nullptr);
if (rc != 0) {
errno = rc;
return -1;
}

rc = pthread_cond_init(&sem->cv, nullptr);
if (rc != 0) {
errno = rc;
return -1;
}

sem->value = value;

return 0;
}

sem_t* sem_open(const char*, int, ...)
int sem_destroy(sem_t*)
{
errno = ENOSYS;
return nullptr;
return 0;
}

int sem_post(sem_t* sem)
int sem_getvalue(sem_t* sem, int* sval)
{
auto rc = pthread_mutex_lock(&sem->mtx);
if (rc != 0) {
errno = rc;
return -1;
}

if (sem->value == SEM_VALUE_MAX) {
pthread_mutex_unlock(&sem->mtx);
errno = EOVERFLOW;
return -1;
}

sem->value++;

rc = pthread_cond_signal(&sem->cv);
if (rc != 0) {
pthread_mutex_unlock(&sem->mtx);
errno = rc;
return -1;
}

rc = pthread_mutex_unlock(&sem->mtx);
if (rc != 0) {
errno = rc;
return -1;
}

u32 value = AK::atomic_load(&sem->value, AK::memory_order_relaxed);
*sval = value & ~POST_WAKES;
return 0;
}

int sem_trywait(sem_t* sem)
int sem_post(sem_t* sem)
{
auto rc = pthread_mutex_lock(&sem->mtx);
if (rc != 0) {
errno = rc;
return -1;
}

if (sem->value == 0) {
pthread_mutex_unlock(&sem->mtx);
errno = EAGAIN;
return -1;
}

sem->value--;

rc = pthread_mutex_unlock(&sem->mtx);
if (rc != 0) {
errno = rc;
return -1;
}
u32 value = AK::atomic_fetch_add(&sem->value, 1u, AK::memory_order_release);
// Fast path: no need to wake.
if (!(value & POST_WAKES)) [[likely]]
return 0;

// Pass the responsibility for waking more threads if more slots become
// available later to sem_wait() in the thread we're about to wake, as
// opposed to further sem_post() calls that free up those slots.
value = AK::atomic_fetch_and(&sem->value, ~POST_WAKES, AK::memory_order_relaxed);
// Check if another sem_post() call has handled it already.
if (!(value & POST_WAKES)) [[likely]]
return 0;
int rc = futex_wake(&sem->value, 1);
VERIFY(rc == 0);
return 0;
}

int sem_unlink(const char*)
int sem_trywait(sem_t* sem)
{
errno = ENOSYS;
return -1;
u32 value = AK::atomic_load(&sem->value, AK::memory_order_relaxed);
u32 count = value & ~POST_WAKES;
if (count == 0)
return EAGAIN;
// Decrement the count without touching the flag.
u32 desired = (count - 1) | (value & POST_WAKES);
bool exchanged = AK::atomic_compare_exchange_strong(&sem->value, value, desired, AK::memory_order_acquire);
if (exchanged) [[likely]]
return 0;
else
return EAGAIN;
}

int sem_wait(sem_t* sem)
{
auto rc = pthread_mutex_lock(&sem->mtx);
if (rc != 0) {
errno = rc;
return -1;
}
return sem_timedwait(sem, nullptr);
}

while (sem->value == 0) {
rc = pthread_cond_wait(&sem->cv, &sem->mtx);
if (rc != 0) {
pthread_mutex_unlock(&sem->mtx);
errno = rc;
return -1;
int sem_timedwait(sem_t* sem, const struct timespec* abstime)
{
u32 value = AK::atomic_load(&sem->value, AK::memory_order_relaxed);
bool responsible_for_waking = false;

while (true) {
u32 count = value & ~POST_WAKES;
if (count > 0) [[likely]] {
// It looks like there are some free slots.
u32 whether_post_wakes = value & POST_WAKES;
bool going_to_wake = false;
if (responsible_for_waking && !whether_post_wakes) {
// If we have ourselves been woken up previously, and the
// POST_WAKES flag is not set, that means some more slots might
// be available now, and it's us who has to wake up additional
// threads.
if (count > 1) [[unlikely]]
going_to_wake = true;
// Pass the responsibility for waking up further threads back to
// sem_post() calls. In particular, we don't want the threads
// we're about to wake to try to wake anyone else.
whether_post_wakes = POST_WAKES;
}
// Now, try to commit this.
u32 desired = (count - 1) | whether_post_wakes;
bool exchanged = AK::atomic_compare_exchange_strong(&sem->value, value, desired, AK::memory_order_acquire);
if (!exchanged) [[unlikely]]
// Re-evaluate.
continue;
if (going_to_wake) [[unlikely]] {
int rc = futex_wake(&sem->value, count - 1);
VERIFY(rc >= 0);
}
return 0;
}
// We're probably going to sleep, so attempt to set the flag. We do not
// commit to sleeping yet, though, as setting the flag may fail and
// cause us to reevaluate what we're doing.
if (value == 0) {
bool exchanged = AK::atomic_compare_exchange_strong(&sem->value, value, POST_WAKES, AK::memory_order_relaxed);
if (!exchanged) [[unlikely]]
// Re-evaluate.
continue;
value = POST_WAKES;
}
// At this point, we're committed to sleeping.
responsible_for_waking = true;
futex_wait(&sem->value, value, abstime, CLOCK_REALTIME);
// This is the state we will probably see upon being waked:
value = 1;
}

sem->value--;

rc = pthread_mutex_unlock(&sem->mtx);
if (rc != 0) {
errno = rc;
return -1;
}

return 0;
}
5 changes: 2 additions & 3 deletions Userland/Libraries/LibPthread/semaphore.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
__BEGIN_DECLS

typedef struct {
pthread_mutex_t mtx;
pthread_cond_t cv;
int value;
uint32_t value;
} sem_t;

int sem_close(sem_t*);
Expand All @@ -28,6 +26,7 @@ int sem_post(sem_t*);
int sem_trywait(sem_t*);
int sem_unlink(const char*);
int sem_wait(sem_t*);
int sem_timedwait(sem_t*, const struct timespec* abstime);

#define SEM_VALUE_MAX INT_MAX

Expand Down

0 comments on commit 51d5f56

Please sign in to comment.