diff --git a/core/include/sched.h b/core/include/sched.h index f7384236ffcf..5100bb8df299 100644 --- a/core/include/sched.h +++ b/core/include/sched.h @@ -172,6 +172,7 @@ typedef enum { STATUS_FLAG_BLOCKED_ALL, /**< waiting for all flags in flag_mask */ STATUS_MBOX_BLOCKED, /**< waiting for get/put on mbox */ STATUS_COND_BLOCKED, /**< waiting for a condition variable */ + STATUS_WQ_BLOCKED, /**< waiting in a wait queue */ STATUS_RUNNING, /**< currently running */ STATUS_PENDING, /**< waiting to be scheduled to run */ STATUS_NUMOF /**< number of supported thread states */ diff --git a/core/include/wait_queue.h b/core/include/wait_queue.h new file mode 100644 index 000000000000..be2188bc1cd9 --- /dev/null +++ b/core/include/wait_queue.h @@ -0,0 +1,242 @@ +/* + * Copyright (C) 2025 Mihai Renea + * + * This file is subject to the terms and conditions of the GNU Lesser + * General Public License v2.1. See the file LICENSE in the top level + * directory for more details. + */ + +/** + * @defgroup core_sync_wait_queue Wait Queue + * @ingroup core_sync + * @brief Linux-like wait queue for condition signaling + * + * Wait queues enable lock-free, IRQ-safe condition signaling. This + * implementation is inspired from the Linux Kernel. + * + * Wait queues have similar semantics to condition variables, but don't require + * setting the condition + signaling to be atomic, hence no mutex is needed. In + * turn, one may safely call queue_wake() from an ISR. Note, while cond_signal() + * and cond_broadcast() are safe to call from an ISR context too, doing so will + * very probably cause a race condition elsewhere. Consider the following + * scenario using condition variables: + * + * ``` + * static uint64_t measurement; + * mutex_t cond_lock = MUTEX_INIT; + * cond_t cond = COND_INIT; + * + * void measurement_irq(void) + * { + * measurement = measure(); + * cond_broadcast(&cond); + * } + * + * void wait_for_critical_value(void) + * { + * mutex_lock(&cond_lock); + * while (atomic_load_u64(&measurement) < THRESHOLD) { + * cond_wait(&cond, cond_lock); + * } + * mutex_unlock(&cond_lock); + * } + * ``` + * + * Note, the mutex is there only for the cond_wait() API call, as we're not + * allowed to call mutex_lock() inside the ISR. This alone is a hint that + * something isn't right and indeed, the following sequence of events is + * possible: + * 1. thread sees measurement < THRESHOLD, and is about to call cond_wait() + * 2. ISR fires, sets measurement = THRESHOLD, and signals the condition + * 3. thread calls cond_wait() and goes to sleep, possibly forever + * + * Using a wait queue, we can do this: + * + * ``` + * static uint64_t measurement; + * wait_queue_t wq = WAIT_QUEUE_INIT; + * + * void measurement_irq(void) + * { + * measurement = measure(); + * queue_wake(&wq); + * } + * + * void wait_for_critical_value(void) + * { + * QUEUE_WAIT(&wq, atomic_load_u64(&measurement) >= THRESHOLD); + * } + * ``` + * + * This is free of the race condition above because QUEUE_WAIT() is a macro + * that checks the condition AFTER queueing the current thread to be waken up. + * + * When to use? + * + * QUEUE_WAIT() is a macro and might come with some additional code size cost + * due to inlining. If you're not synchronizing with an ISR and care very much + * about code size then go for condition variables, otherwise there is no + * reason not to use the wait queue. + * + * Can't I just use a mutex? + * + * You can abuse a mutex by locking it in a loop in the thread context and + * unlocking from the ISR context. But this will only work for a single waiter + * and makes improper use of the mutex semantics. + * + * @{ + * + * @file wait_queue.h + * @brief Linux-like wait queue for condition signaling + * + * @author Mihai Renea + */ + +#ifndef WAIT_QUEUE_H +#define WAIT_QUEUE_H + +#include "sched.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * @brief Forward declaration for @ref wait_queue_entry_t + */ +typedef struct wait_queue_entry wait_queue_entry_t; + +/** + * @cond INTERNAL + * @brief Wait queue entry + * + * Always allocated on the stack. We can't use the linked list node in thread_t + * because while evaluating the condition expression the thread may: + * - queue on some other wait queue + * - block on something else, e.g. a mutex + */ +struct wait_queue_entry { + wait_queue_entry_t *next; /**< linked list head */ + thread_t *thread; /**< thread blocking on the queue */ +}; +/** + * @endcond + */ + +/** + * @brief Wait queue struct + */ +typedef struct { + wait_queue_entry_t *list; /**< next thread, has lower or equal priority */ +} wait_queue_t; + +/** + * @cond INTERNAL + * @brief List terminator for the wait queue + */ +#define WAIT_QUEUE_TAIL ((void *)(-1)) +/** + * @endcond + */ + +/** + * @brief Init a wait queue + */ +#define WAIT_QUEUE_INIT { .list = WAIT_QUEUE_TAIL } + +/** + * @brief Enable @ref QUEUE_WAIT() early exit optimization if the condition + * evaluates true. + * + * This optimization is turned off to facilitate the testing of some edge cases. + * There's no good reason to disable this other than to save a few bytes. + */ +#ifndef CONFIG_QUEUE_WAIT_EARLY_EXIT +# define CONFIG_QUEUE_WAIT_EARLY_EXIT 1 +#endif + +/** + * @cond INTERNAL + */ +#if CONFIG_QUEUE_WAIT_EARLY_EXIT +# define BREAK_IF_TRUE(cond) \ + if (cond) { \ + break; \ + } +#else +# define BREAK_IF_TRUE(cond) (void)(0) +#endif + +/* For internal use within the @ref QUEUE_WAIT() macro only. Not the cleanest + * decomposition, but we want to keep the macro tight. */ +void _prepare_to_wait(wait_queue_t *wq, wait_queue_entry_t *entry); +void _maybe_yield_and_enqueue(wait_queue_t *wq, wait_queue_entry_t *entry); +void _wait_dequeue(wait_queue_t *wq, wait_queue_entry_t *entry); + +/* For internal use only in queue_wake() and queue_wake_exclusive(). */ +void _queue_wake_common(wait_queue_t *wq, bool all); +/** + * @endcond + */ + +/** + * @brief Wait for a condition to become true. + * + * Will not return for as long as the condition expression @p cond evaluates to + * false. + * + * @note @p cond may get evaluated multiple times. + * + * @note The interrupt state at the moment of calling this macro will be + * restored before executing the condition expression and before + * returning, but interrupts MAY get enabled if the condition evaluates + * false, as the thread MAY have to go to sleep. + * + * @warning @p cond is NOT executed atomically. If that is a requirement, you + * can: + * - lock within the expression, e.g. lock a mutex, disable interrupts. + * - call this with interrupts disabled. Interrupts will be kept + * disabled during the condition evaluation. + * + * @param[in] wq wait queue to wait on + * @param[in] cond condition expression to be evaluated + */ +#define QUEUE_WAIT(wq, cond) \ + do { \ + BREAK_IF_TRUE(cond); \ + \ + wait_queue_entry_t me; \ + _prepare_to_wait(wq, &me); \ + while (!(cond)) { \ + _maybe_yield_and_enqueue(wq, &me); \ + } \ + _wait_dequeue(wq, &me); \ + } while (0) + + +/** + * @brief Wake one thread queued on the wait queue. + * + * @param wq wait queue to signal + */ +static inline void queue_wake_exclusive(wait_queue_t *wq) +{ + _queue_wake_common(wq, false); +} + +/** + * @brief Wake all threads queued on the wait queue. + * + * @param wq wait queue to signal + */ +static inline void queue_wake(wait_queue_t *wq) +{ + _queue_wake_common(wq, true); +} + +#ifdef __cplusplus +} +#endif + +#endif /* WAIT_QUEUE_H */ +/** @} */ diff --git a/core/thread.c b/core/thread.c index 979a95b90112..0c7387b926f1 100644 --- a/core/thread.c +++ b/core/thread.c @@ -364,6 +364,7 @@ static const char *state_names[STATUS_NUMOF] = { [STATUS_FLAG_BLOCKED_ALL] = "bl allfl", [STATUS_MBOX_BLOCKED] = "bl mbox", [STATUS_COND_BLOCKED] = "bl cond", + [STATUS_WQ_BLOCKED] = "bl wq", [STATUS_RUNNING] = "running", [STATUS_PENDING] = "pending", }; diff --git a/core/wq.c b/core/wq.c new file mode 100644 index 000000000000..6c90ec14bc1e --- /dev/null +++ b/core/wq.c @@ -0,0 +1,160 @@ +/* + * Copyright (C) 2025 Mihai Renea + * + * This file is subject to the terms and conditions of the GNU Lesser + * General Public License v2.1. See the file LICENSE in the top level + * directory for more details. + */ + +/** + * @ingroup core_sync + * @{ + * + * @file + * @brief Wait queue implementation + * + * @author Mihai Renea + * + * @} + */ + +#include "irq.h" +#include "wait_queue.h" + +#define ENABLE_DEBUG 0 +#include "debug.h" + +static inline bool _is_in_wq(wait_queue_entry_t *entry) +{ + /* A queued entry will either point to the next or to QUEUE_WAIT_TAIL. */ + return entry->next != NULL; +} + +/* @pre interrupts disabled, de-queued + * @post interrupts restored to @p irq_state */ +static void _wait_enqueue(wait_queue_t *wq, wait_queue_entry_t *entry, int irq_state) +{ + assert(!_is_in_wq(entry)); + + wait_queue_entry_t **curr_pp = &wq->list; + while ((*curr_pp != WAIT_QUEUE_TAIL) && + (*curr_pp)->thread->priority <= entry->thread->priority) { + curr_pp = &(*curr_pp)->next; + } + + entry->next = *curr_pp; + *curr_pp = entry; + + irq_restore(irq_state); +} + +void _prepare_to_wait(wait_queue_t *wq, wait_queue_entry_t *entry) +{ + int irq_state = irq_disable(); + + *entry = (wait_queue_entry_t) { + .thread = thread_get_active(), + .next = NULL, + }; + + _wait_enqueue(wq, entry, irq_state); +} + +void _wait_dequeue(wait_queue_t *wq, wait_queue_entry_t *entry) +{ + int irq_state = irq_disable(); + + wait_queue_entry_t **curr_pp = &wq->list; + while (*curr_pp != WAIT_QUEUE_TAIL) { + if (*curr_pp == entry) { + *curr_pp = (*curr_pp)->next; +#ifndef NDEBUG + /* Mark as not queued only for debugging, as the entry is about to + * go out of scope anyway. */ + entry->next = NULL; +#endif + break; + } + curr_pp = &(*curr_pp)->next; + } + + assert(entry->next == NULL); + + irq_restore(irq_state); +} + +void _maybe_yield_and_enqueue(wait_queue_t *wq, wait_queue_entry_t *entry) +{ + int irq_state = irq_disable(); + if (!_is_in_wq(entry)) { + /* Queue got signaled while evaluating the condition expression. Don't + * go to sleep but re-evaluate the condition. */ + _wait_enqueue(wq, entry, irq_state); + return; + } + + sched_set_status(entry->thread, STATUS_WQ_BLOCKED); + /* _queue_wake_common(wq) can't tell whether the thread is sleeping on wq + * or on some other queue, which is possible during the condition expression + * evaluation. But we can "mark" the thread with the queue it's actually + * sleeping on by having the thread's linked list slot (which we don't use + * otherwise) point back to this entry. */ + entry->thread->rq_entry.next = (void *)entry; + + irq_enable(); + thread_yield_higher(); + irq_disable(); + + _wait_enqueue(wq, entry, irq_state); +} + +void _queue_wake_common(wait_queue_t *wq, bool all) +{ + int irq_state = irq_disable(); + + uint16_t highest_prio = THREAD_PRIORITY_MIN + 1; + + wait_queue_entry_t *head; + while ((head = wq->list) != WAIT_QUEUE_TAIL) { + thread_t *thread = head->thread; + /* Wake the thread only if it blocks on THIS queue, otherwise: + * - it is already on the run queue, dead or whatever, in which case + * there is nothing to be done, or + * - it blocks on something else (e.g. a mutex) while evaluating the + * condition expression, in which case we may not wake it up, as + * RIOTs locking primitives don't expect spurious wake-ups. This can + * also be another wait queue, which is why we check if the thread + * points back to this wait-queue entry (see _maybe_yield()) */ + if (thread->status == STATUS_WQ_BLOCKED && + (wait_queue_entry_t *)thread->rq_entry.next == head) { + sched_set_status(thread, STATUS_PENDING); + if (highest_prio == THREAD_PRIORITY_MIN + 1) { + /* First thread to be waken up. We don't care about the + * priorities of subsequent threads - they must be equal or + * lower. */ + highest_prio = thread->priority; + } + + DEBUG("wq: woke up thread %d\n", head->thread->pid); + } + else { + DEBUG("wq: won't wake thread %d in state `%s`\n", + head->thread->pid, + thread_state_to_string(head->thread->status)); + + } + /* We remove the thread from the wait queue in all cases s.t. + * _maybe_yield() sees the state change and forces another condition + * check instead of going to sleep. */ + wq->list = head->next; + head->next = NULL; + + if (!all) { + break; + } + } + + irq_enable(); + sched_switch(highest_prio); + irq_restore(irq_state); +} diff --git a/tests/core/wait_queue/Makefile b/tests/core/wait_queue/Makefile new file mode 100644 index 000000000000..e9444155c161 --- /dev/null +++ b/tests/core/wait_queue/Makefile @@ -0,0 +1,8 @@ +include ../Makefile.core_common + +include Makefile.common + +# disable early exit optimization to facilitate the testing of some edge cases +CFLAGS += -DCONFIG_QUEUE_WAIT_EARLY_EXIT=0 + +include $(RIOTBASE)/Makefile.include diff --git a/tests/core/wait_queue/Makefile.ci b/tests/core/wait_queue/Makefile.ci new file mode 100644 index 000000000000..7092fda88a57 --- /dev/null +++ b/tests/core/wait_queue/Makefile.ci @@ -0,0 +1,29 @@ +BOARD_INSUFFICIENT_MEMORY := \ + arduino-duemilanove \ + arduino-leonardo \ + arduino-nano \ + arduino-uno \ + atmega328p \ + atmega328p-xplained-mini \ + atmega8 \ + bluepill-stm32f030c8 \ + i-nucleo-lrwan1 \ + nucleo-f030 \ + nucleo-f030r8 \ + nucleo-f031k6 \ + nucleo-f042k6 \ + nucleo-l011k4 \ + nucleo-l031k6 \ + nucleo-l053 \ + nucleo-l053r8 \ + nucleo32-f031 \ + nucleo32-f042 \ + nucleo32-l031 \ + samd10-xmini \ + slstk3400a \ + stk3200 \ + stm32f030f4-demo \ + stm32f0discovery \ + stm32g0316-disco \ + stm32l0538-disco \ + # diff --git a/tests/core/wait_queue/Makefile.common b/tests/core/wait_queue/Makefile.common new file mode 100644 index 000000000000..3097173b6ca3 --- /dev/null +++ b/tests/core/wait_queue/Makefile.common @@ -0,0 +1,6 @@ +USEMODULE += ztimer +USEMODULE += ztimer_msec +USEMODULE += sema + +DEVELHELP ?= 0 +ASSERT_VERBOSE ?= 0 diff --git a/tests/core/wait_queue/README.md b/tests/core/wait_queue/README.md new file mode 100644 index 000000000000..b9f4e68b67be --- /dev/null +++ b/tests/core/wait_queue/README.md @@ -0,0 +1,8 @@ +Test for the wait queue +======================= + +This test disables the early exit optimization of the `QUEUE_WAIT()` macro as +it hinders the testing of some edge cases and only trivially plays a role in +correctness. However, for completeness, the [wait_queue_optimized +test](../wait_queue_optimized/) runs this same test with the early exit +optimization on, but some sections of the test are not performed. diff --git a/tests/core/wait_queue/main.c b/tests/core/wait_queue/main.c new file mode 100644 index 000000000000..a6784fcc4499 --- /dev/null +++ b/tests/core/wait_queue/main.c @@ -0,0 +1,453 @@ +/* + * Copyright (C) 2025 Mihai Renea + * + * This file is subject to the terms and conditions of the GNU Lesser + * General Public License v2.1. See the file LICENSE in the top level + * directory for more details. + */ + +/** + * @ingroup tests + * @{ + * + * @file + * @brief Test application for wait queues + * + * @author Mihai Renea + * @} + */ + +#include "wait_queue.h" + +#include +#include + +#include "ztimer.h" +#include "wait_queue.h" +#include "atomic_utils.h" +#include "sema.h" +#include "test_utils/expect.h" + +#define ENABLE_DEBUG 0 +#include "debug.h" + +#define WAITERS_CNT 2 + +static char stacks[WAITERS_CNT][THREAD_STACKSIZE_MAIN]; +static wait_queue_t wq = WAIT_QUEUE_INIT; +static uint64_t cond_val = 0; +static sema_t woken_cnt = SEMA_CREATE(0); + +#define COND_ITER_CNT_INIT (CONFIG_QUEUE_WAIT_EARLY_EXIT ? (uint32_t)-WAITERS_CNT : 0) +static uint32_t cond_iter_cnt = COND_ITER_CNT_INIT; + +/* increment a counter before the condition expression is evaluated */ +#define COUNTING_COND(cnt, expr) (atomic_fetch_add_u32(&(cnt), 1), expr) + +/* assert interrupts enabled before evaluating the expression */ +#define IRQ_ON(expr) (expect(irq_is_enabled()), expr) + +/* assert interrupts disabled before evaluating the expression */ +#define IRQ_OFF(expr) (expect(!irq_is_enabled()), expr) + +#define COND_VAL_THRESHOLD (UINT32_MAX + 1ULL) + +static void timeout_cb(void *arg) +{ + bool broadcast = arg; + + cond_val = COND_VAL_THRESHOLD; + if (broadcast) { + queue_wake(&wq); + } else { + queue_wake_exclusive(&wq); + } +} + +static void *waiter_nonblocking(void *arg) +{ + QUEUE_WAIT(&wq, COUNTING_COND(cond_iter_cnt, + IRQ_ON(atomic_load_u64(&cond_val) >= COND_VAL_THRESHOLD))); + + sema_post(&woken_cnt); + DEBUG("waiter %u awake!\n", (unsigned)arg); + + return NULL; +} + +static void *waiter_nonblocking_irqdisabled(void *arg) +{ + irq_disable(); + QUEUE_WAIT(&wq, COUNTING_COND(cond_iter_cnt, IRQ_OFF(cond_val >= COND_VAL_THRESHOLD))); + expect(!irq_is_enabled()); + irq_enable(); + + sema_post(&woken_cnt); + DEBUG("waiter %u awake!\n", (unsigned)arg); + + return NULL; +} + +void test_waiters_nonblocking(thread_task_func_t waiter_func) +{ + cond_val = 0; + cond_iter_cnt = COND_ITER_CNT_INIT; + + int thread_ids[WAITERS_CNT]; + for (unsigned i = 0; i < WAITERS_CNT; i++) { + thread_ids[i] = thread_create(stacks[i], sizeof(stacks[0]), + THREAD_PRIORITY_MAIN - i - 1, + THREAD_CREATE_STACKTEST, waiter_func, + (void *)i, "waiter"); + expect(thread_ids[i] >= 0); + } + + /* waiters have higher prio, they should all be enqueued now */ + expect(wq.list != WAIT_QUEUE_TAIL); + uint32_t cond_iter_cnt_expected = WAITERS_CNT; + expect(atomic_load_u32(&cond_iter_cnt) == cond_iter_cnt_expected); + + ztimer_t timer = { + .callback = timeout_cb, + .arg = (void *)false, // wake exclusive + }; + ztimer_set(ZTIMER_MSEC, &timer, 100); + + ztimer_sleep(ZTIMER_MSEC, 50); + + int ret = sema_try_wait(&woken_cnt); + expect(ret == -EAGAIN); + + /* Wake all, but they should go back to sleep because the condition is not + * met yet */ + queue_wake(&wq); + + sema_try_wait(&woken_cnt); + expect(ret == -EAGAIN); + + cond_iter_cnt_expected += WAITERS_CNT; + expect(atomic_load_u32(&cond_iter_cnt) == cond_iter_cnt_expected); + + ret = sema_wait_timed_ztimer(&woken_cnt, ZTIMER_MSEC, 51); + expect(ret == 0); + + cond_iter_cnt_expected += 1; + expect(atomic_load_u32(&cond_iter_cnt) == cond_iter_cnt_expected); + /* last thread had highest prio */ + expect(thread_get(thread_ids[WAITERS_CNT - 1]) == NULL); + + ztimer_sleep(ZTIMER_MSEC, 10); + + ret = sema_try_wait(&woken_cnt); + expect(ret == -EAGAIN); + expect(atomic_load_u32(&cond_iter_cnt) == cond_iter_cnt_expected); + + cond_val = 0; + + /* Wake all, but they should go back to sleep because the condition is not + * met yet */ + queue_wake(&wq); + + cond_iter_cnt_expected += WAITERS_CNT - 1; + expect(atomic_load_u32(&cond_iter_cnt) == cond_iter_cnt_expected); + + ret = sema_try_wait(&woken_cnt); + expect(ret == -EAGAIN); + + timer.arg = (void *)true; // wake all + ztimer_set(ZTIMER_MSEC, &timer, 10); + + for (unsigned i = 0; i < (WAITERS_CNT - 1); i++) { + ret = sema_wait_timed_ztimer(&woken_cnt, ZTIMER_MSEC, 11); + expect(ret == 0); + } + + cond_iter_cnt_expected += WAITERS_CNT - 1; + expect(atomic_load_u32(&cond_iter_cnt) == cond_iter_cnt_expected); + + /* just to make sure all threads finish */ + ztimer_sleep(ZTIMER_MSEC, 2); +} + +void test_waiters_lowprio(void) +{ + cond_val = 0; + cond_iter_cnt = COND_ITER_CNT_INIT; + + int thread_ids[WAITERS_CNT]; + for (unsigned i = 0; i < WAITERS_CNT; i++) { + thread_ids[i] = thread_create(stacks[i], sizeof(stacks[0]), + THREAD_PRIORITY_MAIN + i + 1, + THREAD_CREATE_STACKTEST, + waiter_nonblocking, (void *)i, "waiter"); + expect(thread_ids[i] >= 0); + } + + ztimer_sleep(ZTIMER_MSEC, 2); + + /* they should all be enqueued now */ + expect(wq.list != WAIT_QUEUE_TAIL); + uint32_t cond_iter_cnt_expected = WAITERS_CNT; + expect(atomic_load_u32(&cond_iter_cnt) == cond_iter_cnt_expected); + + cond_val = COND_VAL_THRESHOLD; + queue_wake_exclusive(&wq); + + expect(wq.list != WAIT_QUEUE_TAIL); + + /* busy-wait for a while */ + for (unsigned i = 0; i < (UINT_MAX > 1000000UL ? 1000000UL : UINT_MAX - 1); i++) { + expect(atomic_load_u32(&cond_iter_cnt) == cond_iter_cnt_expected); + } + + int ret = sema_wait(&woken_cnt); + expect(ret == 0); + + cond_iter_cnt_expected += 1; + expect(atomic_load_u32(&cond_iter_cnt) == cond_iter_cnt_expected); + + /* Just to make sure the thread finishes, as it has lower prio. */ + ztimer_sleep(ZTIMER_MSEC, 10); + expect(thread_get(thread_ids[0]) == NULL); + + queue_wake(&wq); + + for (unsigned i = 0; i < WAITERS_CNT - 1; i++) { + ret = sema_wait(&woken_cnt); + expect(ret == 0); + } + + cond_iter_cnt_expected += WAITERS_CNT - 1; + expect(atomic_load_u32(&cond_iter_cnt) == cond_iter_cnt_expected); + + /* Just to make sure all threads finish, as they have low prio. */ + ztimer_sleep(ZTIMER_MSEC, 50); +} + +void test_cond_already_true(void) +{ + cond_val = COND_VAL_THRESHOLD; + /* cond expression will always get executed only once */ + cond_iter_cnt = 0; + + int thread_ids[WAITERS_CNT]; + for (unsigned i = 0; i < WAITERS_CNT; i++) { + thread_ids[i] = thread_create(stacks[i], sizeof(stacks[0]), + THREAD_PRIORITY_MAIN - i - 1, + THREAD_CREATE_STACKTEST, waiter_nonblocking, + (void *)i, "waiter"); + expect(thread_ids[i] >= 0); + } + + expect(wq.list == WAIT_QUEUE_TAIL); + expect(atomic_load_u32(&cond_iter_cnt) == WAITERS_CNT); + + for (unsigned i = 0; i < WAITERS_CNT; i++) { + int ret = sema_wait(&woken_cnt); + expect(ret == 0); + } + + ztimer_sleep(ZTIMER_MSEC, 2); +} + +#if !CONFIG_QUEUE_WAIT_EARLY_EXIT + +static mutex_t cond_mutex = MUTEX_INIT; + +static bool cond_fn_mutex(void) +{ + mutex_lock(&cond_mutex); + bool ret = atomic_load_u64(&cond_val) >= COND_VAL_THRESHOLD; + mutex_unlock(&cond_mutex); + + return ret; +} + +static wait_queue_t nested_wq = WAIT_QUEUE_INIT; +static uint32_t nested_cond_iter_cnt = 0; +static uint64_t nested_cond_val = 0; + +static bool cond_fn_wq(unsigned waiter_no) +{ + QUEUE_WAIT(&nested_wq, + COUNTING_COND(nested_cond_iter_cnt, + atomic_load_u64(&nested_cond_val) >= COND_VAL_THRESHOLD)); + DEBUG("waiter %u: past inner wq\n", waiter_no); + return atomic_load_u64(&cond_val) >= COND_VAL_THRESHOLD; +} + +static void *waiter_blocking_queue(void *arg) +{ + QUEUE_WAIT(&wq, COUNTING_COND(cond_iter_cnt, cond_fn_wq((unsigned)arg))); + + sema_post(&woken_cnt); + DEBUG("waiter %u awake!\n", (unsigned)arg); + + return NULL; +} + +static void *waiter_blocking_mutex(void *arg) +{ + QUEUE_WAIT(&wq, COUNTING_COND(cond_iter_cnt, cond_fn_mutex())); + + sema_post(&woken_cnt); + DEBUG("waiter %u awake!\n", (unsigned)arg); + + return NULL; +} + +void test_waiters_blocking_mutex(void) +{ + cond_val = 0; + cond_iter_cnt = 0; + + mutex_lock(&cond_mutex); + + int thread_ids[WAITERS_CNT]; + for (unsigned i = 0; i < WAITERS_CNT; i++) { + thread_ids[i] = thread_create(stacks[i], sizeof(stacks[0]), + THREAD_PRIORITY_MAIN - i - 1, + THREAD_CREATE_STACKTEST, + waiter_blocking_mutex, (void *)i, "waiter"); + expect(thread_ids[i] >= 0); + } + + /* waiters have higher prio, they should all be enqueued now */ + expect(wq.list != WAIT_QUEUE_TAIL); + uint32_t cond_iter_cnt_expected = WAITERS_CNT; + expect(atomic_load_u32(&cond_iter_cnt) == cond_iter_cnt_expected); + + /* wake up all, but since they block on the mutex they should stay asleep */ + cond_val = COND_VAL_THRESHOLD; + queue_wake(&wq); + expect(wq.list == WAIT_QUEUE_TAIL); + expect(atomic_load_u32(&cond_iter_cnt) == cond_iter_cnt_expected); + + /* they should all wake up now and do another condition check, then wait + * once again */ + cond_val = 0; + mutex_unlock(&cond_mutex); + + cond_iter_cnt_expected += WAITERS_CNT; + expect(atomic_load_u32(&cond_iter_cnt) == cond_iter_cnt_expected); + + cond_val = COND_VAL_THRESHOLD; + queue_wake(&wq); + + cond_iter_cnt_expected += WAITERS_CNT; + expect(atomic_load_u32(&cond_iter_cnt) == cond_iter_cnt_expected); + + for (unsigned i = 0; i < WAITERS_CNT; i++) { + int ret = sema_try_wait(&woken_cnt); + expect(ret == 0); + } + + /* just to make sure all threads finish */ + ztimer_sleep(ZTIMER_MSEC, 5); +} + +void test_waiters_blocking_wq(void) +{ + /* condition is already met, but the expression will block on the inner + * wait queue */ + cond_val = COND_VAL_THRESHOLD; + cond_iter_cnt = 0; + nested_cond_val = 0; + + int thread_ids[WAITERS_CNT]; + for (unsigned i = 0; i < WAITERS_CNT; i++) { + thread_ids[i] = thread_create(stacks[i], sizeof(stacks[0]), + THREAD_PRIORITY_MAIN - i - 1, + THREAD_CREATE_STACKTEST, + waiter_blocking_queue, (void *)i, "waiter"); + expect(thread_ids[i] >= 0); + } + + nested_cond_val = COND_VAL_THRESHOLD; + /* waiters have higher prio, they should all be enqueued now */ + expect(wq.list != WAIT_QUEUE_TAIL); + uint32_t cond_iter_cnt_expected = WAITERS_CNT; + expect(atomic_load_u32(&cond_iter_cnt) == cond_iter_cnt_expected); + + expect(nested_wq.list != WAIT_QUEUE_TAIL); + uint32_t nested_cond_iter_cnt_expected = WAITERS_CNT; + expect(atomic_load_u32(&nested_cond_iter_cnt) == nested_cond_iter_cnt_expected); + + /* wake up all, but since they block on the inner wq they should stay asleep */ + cond_val = COND_VAL_THRESHOLD; + queue_wake(&wq); + expect(wq.list == WAIT_QUEUE_TAIL); + expect(atomic_load_u32(&cond_iter_cnt) == cond_iter_cnt_expected); + + /* nothing should have changed on the nested wait queue */ + expect(nested_wq.list != WAIT_QUEUE_TAIL); + expect(atomic_load_u32(&nested_cond_iter_cnt) == nested_cond_iter_cnt_expected); + + /* they should all wake up now and do another NESTED condition check, then wait + * once again */ + nested_cond_val = 0; + queue_wake(&nested_wq); + + nested_cond_iter_cnt_expected += WAITERS_CNT; + expect(nested_wq.list != WAIT_QUEUE_TAIL); + expect(atomic_load_u32(&nested_cond_iter_cnt) == nested_cond_iter_cnt_expected); + + /* nothing should have changed on the OUTER wait queue */ + expect(atomic_load_u32(&cond_iter_cnt) == cond_iter_cnt_expected); + expect(wq.list == WAIT_QUEUE_TAIL); + + /* The NESTED wait loop should finish, but the condition for the OUTER + * wait queue is not met */ + cond_val = 0; + nested_cond_val = COND_VAL_THRESHOLD; + queue_wake(&nested_wq); + + cond_iter_cnt_expected += WAITERS_CNT; + expect(atomic_load_u32(&cond_iter_cnt) == cond_iter_cnt_expected); + + expect(nested_wq.list == WAIT_QUEUE_TAIL); + expect(wq.list != WAIT_QUEUE_TAIL); + + cond_val = COND_VAL_THRESHOLD; + queue_wake(&wq); + + cond_iter_cnt_expected += WAITERS_CNT; + expect(atomic_load_u32(&cond_iter_cnt) == cond_iter_cnt_expected); + + for (unsigned i = 0; i < WAITERS_CNT; i++) { + int ret = sema_try_wait(&woken_cnt); + expect(ret == 0); + } + + /* just to make sure all threads finish */ + ztimer_sleep(ZTIMER_MSEC, 2); +} + +#endif + +int main(void) +{ + test_waiters_nonblocking(waiter_nonblocking); + puts("tested non-blocking condition expression"); + + test_waiters_nonblocking(waiter_nonblocking_irqdisabled); + puts("tested non-blocking condition expression (IRQ disabled)"); + + test_waiters_lowprio(); + puts("tested non-blocking condition expression (waiters have low prio)"); + + test_cond_already_true(); + puts("tested condition already true"); + +#if !CONFIG_QUEUE_WAIT_EARLY_EXIT + test_waiters_blocking_mutex(); + puts("tested condition expression blocking on mutex"); + + test_waiters_blocking_wq(); + puts("tested condition expression blocking on a nested wait queue"); +#endif + + puts("-----\nTest successful!"); + + return 0; +} diff --git a/tests/core/wait_queue/tests/01-run.py b/tests/core/wait_queue/tests/01-run.py new file mode 100755 index 000000000000..ccc946439706 --- /dev/null +++ b/tests/core/wait_queue/tests/01-run.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2025 Mihai Renea +# +# This file is subject to the terms and conditions of the GNU Lesser +# General Public License v2.1. See the file LICENSE in the top level +# directory for more details. + +# Author: Mihai Renea + +import sys +from testrunner import run + + +def testfunc(child): + child.expect("Test successful!") + + +if __name__ == "__main__": + sys.exit(run(testfunc)) diff --git a/tests/core/wait_queue_optimized/Makefile b/tests/core/wait_queue_optimized/Makefile new file mode 100644 index 000000000000..73032fc64f91 --- /dev/null +++ b/tests/core/wait_queue_optimized/Makefile @@ -0,0 +1,5 @@ +include ../Makefile.core_common + +include ../wait_queue/Makefile.common + +include $(RIOTBASE)/Makefile.include diff --git a/tests/core/wait_queue_optimized/Makefile.ci b/tests/core/wait_queue_optimized/Makefile.ci new file mode 120000 index 000000000000..ca92f1f7de1f --- /dev/null +++ b/tests/core/wait_queue_optimized/Makefile.ci @@ -0,0 +1 @@ +../wait_queue/Makefile.ci \ No newline at end of file diff --git a/tests/core/wait_queue_optimized/README.md b/tests/core/wait_queue_optimized/README.md new file mode 100644 index 000000000000..1855ed80e8a9 --- /dev/null +++ b/tests/core/wait_queue_optimized/README.md @@ -0,0 +1,6 @@ +Test for the wait queue (optimized) +=================================== + +This tests the wait queue with the early exit optimization enabled (default). +It performs a subset of the tests in [wait_queue test](../wait_queue/). See the +[README](../wait_queue/README.md) for why not all the tests can be run. diff --git a/tests/core/wait_queue_optimized/main.c b/tests/core/wait_queue_optimized/main.c new file mode 120000 index 000000000000..47617773e1ef --- /dev/null +++ b/tests/core/wait_queue_optimized/main.c @@ -0,0 +1 @@ +../wait_queue/main.c \ No newline at end of file diff --git a/tests/core/wait_queue_optimized/tests/01-run.py b/tests/core/wait_queue_optimized/tests/01-run.py new file mode 120000 index 000000000000..f270ae8c293c --- /dev/null +++ b/tests/core/wait_queue_optimized/tests/01-run.py @@ -0,0 +1 @@ +../../wait_queue/tests/01-run.py \ No newline at end of file