Skip to content

Commit

Permalink
Add internal reference counting to semaphores.
Browse files Browse the repository at this point in the history
  • Loading branch information
Pansysk75 committed Jul 11, 2023
1 parent 8a949a7 commit 9e45942
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#pragma once

#include <hpx/config.hpp>
#include <hpx/memory/serialization/intrusive_ptr.hpp>
#include <hpx/synchronization/detail/counting_semaphore.hpp>
#include <hpx/synchronization/spinlock.hpp>
#include <hpx/timing/steady_clock.hpp>
Expand Down Expand Up @@ -350,6 +351,8 @@ namespace hpx {

protected:
using mutex_type = Mutex;
using data_type =
lcos::local::detail::counting_semaphore_data<mutex_type>;

public:
static constexpr std::ptrdiff_t(max)() noexcept
Expand All @@ -358,35 +361,39 @@ namespace hpx {
}

explicit counting_semaphore(std::ptrdiff_t value) noexcept
: sem_(value)
: data_(new data_type(value), false)
{
}

~counting_semaphore() = default;

void release(std::ptrdiff_t update = 1)
{
std::unique_lock<mutex_type> l(mtx_);
sem_.signal(HPX_MOVE(l), update);
auto data = data_; //keep alive
std::unique_lock<mutex_type> l(data->mtx_);
data->sem_.signal(HPX_MOVE(l), update);
}

bool try_acquire() noexcept
{
std::unique_lock<mutex_type> l(mtx_);
return sem_.try_acquire(l);
auto data = data_; //keep alive
std::unique_lock<mutex_type> l(data->mtx_);
return data->sem_.try_acquire(l);
}

void acquire()
{
std::unique_lock<mutex_type> l(mtx_);
sem_.wait(l, 1);
auto data = data_; //keep alive
std::unique_lock<mutex_type> l(data->mtx_);
data->sem_.wait(l, 1);
}

bool try_acquire_until(
hpx::chrono::steady_time_point const& abs_time)
{
std::unique_lock<mutex_type> l(mtx_);
return sem_.wait_until(l, abs_time, 1);
auto data = data_; //keep alive
std::unique_lock<mutex_type> l(data->mtx_);
return data->sem_.wait_until(l, abs_time, 1);
}

bool try_acquire_for(hpx::chrono::steady_duration const& rel_time)
Expand All @@ -395,8 +402,7 @@ namespace hpx {
}

protected:
mutable mutex_type mtx_;
hpx::lcos::local::detail::counting_semaphore sem_;
hpx::intrusive_ptr<data_type> data_;
};
} // namespace detail

Expand All @@ -410,6 +416,7 @@ namespace hpx {
{
private:
using mutex_type = Mutex;
using detail::counting_semaphore<PTRDIFF_MAX, Mutex>::data_;

public:
explicit counting_semaphore_var(std::ptrdiff_t value = N) noexcept
Expand All @@ -420,29 +427,35 @@ namespace hpx {
counting_semaphore_var(counting_semaphore_var const&) = delete;
counting_semaphore_var& operator=(
counting_semaphore_var const&) = delete;
counting_semaphore_var(counting_semaphore_var&&) = delete;
counting_semaphore_var& operator=(counting_semaphore_var&&) = delete;

void wait(std::ptrdiff_t count = 1)
{
std::unique_lock<mutex_type> l(this->mtx_);
this->sem_.wait(l, count);
auto data = data_; //keep alive
std::unique_lock<mutex_type> l(data->mtx_);
data->sem_.wait(l, count);
}

bool try_wait(std::ptrdiff_t count = 1)
{
std::unique_lock<mutex_type> l(this->mtx_);
return this->sem_.try_wait(l, count);
auto data = data_; //keep alive
std::unique_lock<mutex_type> l(data->mtx_);
return data->sem_.try_wait(l, count);
}

void signal(std::ptrdiff_t count = 1)
{
std::unique_lock<mutex_type> l(this->mtx_);
this->sem_.signal(HPX_MOVE(l), count);
auto data = data_; //keep alive
std::unique_lock<mutex_type> l(data->mtx_);
data->sem_.signal(HPX_MOVE(l), count);
}

std::ptrdiff_t signal_all()
{
std::unique_lock<mutex_type> l(this->mtx_);
return this->sem_.signal_all(HPX_MOVE(l));
auto data = data_; //keep alive
std::unique_lock<mutex_type> l(data->mtx_);
return data->sem_.signal_all(HPX_MOVE(l));
}
};
} // namespace hpx
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,37 @@ namespace hpx::lcos::local::detail {
std::ptrdiff_t value_;
local::detail::condition_variable cond_;
};

template <typename Mutex>
struct counting_semaphore_data
{
counting_semaphore_data(std::ptrdiff_t value) noexcept
: sem_(value)
, count_(1)
{
}

mutable Mutex mtx_;
detail::counting_semaphore sem_;

private:
friend void intrusive_ptr_add_ref(
counting_semaphore_data<Mutex>* p) noexcept
{
++p->count_;
}

friend void intrusive_ptr_release(
counting_semaphore_data<Mutex>* p) noexcept
{
if (0 == --p->count_)
{
delete p;
}
}

hpx::util::atomic_count count_;
};
} // namespace hpx::lcos::local::detail

#if defined(HPX_MSVC_WARNING_PRAGMA)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,38 @@ namespace hpx::lcos::local::detail {
std::int64_t lower_limit_;
local::detail::condition_variable cond_;
};

template <typename Mutex>
struct sliding_semaphore_data
{
sliding_semaphore_data(
std::int64_t max_difference, std::int64_t lower_limit) noexcept
: sem_(max_difference, lower_limit)
, count_(1)
{
}

mutable Mutex mtx_;
detail::sliding_semaphore sem_;

private:
friend void intrusive_ptr_add_ref(
sliding_semaphore_data<Mutex>* p) noexcept
{
++p->count_;
}

friend void intrusive_ptr_release(
sliding_semaphore_data<Mutex>* p) noexcept
{
if (0 == --p->count_)
{
delete p;
}
}

hpx::util::atomic_count count_;
};
} // namespace hpx::lcos::local::detail

#if defined(HPX_MSVC_WARNING_PRAGMA)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#pragma once

#include <hpx/config.hpp>
#include <hpx/memory/serialization/intrusive_ptr.hpp>
#include <hpx/synchronization/detail/sliding_semaphore.hpp>
#include <hpx/synchronization/spinlock.hpp>

Expand Down Expand Up @@ -43,8 +44,15 @@ namespace hpx {
{
private:
using mutex_type = Mutex;
using data_type =
lcos::local::detail::sliding_semaphore_data<mutex_type>;

public:
sliding_semaphore_var(sliding_semaphore_var const&) = delete;
sliding_semaphore_var& operator=(sliding_semaphore_var const&) = delete;
sliding_semaphore_var(sliding_semaphore_var&&) = delete;
sliding_semaphore_var& operator=(sliding_semaphore_var&&) = delete;

/// \brief Construct a new sliding semaphore
///
/// \param max_difference
Expand All @@ -55,7 +63,7 @@ namespace hpx {
/// \param lower_limit [in] The initial lower limit.
explicit sliding_semaphore_var(
std::int64_t max_difference, std::int64_t lower_limit = 0) noexcept
: sem_(max_difference, lower_limit)
: data_(new data_type(max_difference, lower_limit), false)
{
}

Expand All @@ -70,8 +78,9 @@ namespace hpx {
void set_max_difference(
std::int64_t max_difference, std::int64_t lower_limit = 0) noexcept
{
std::unique_lock<mutex_type> l(mtx_);
sem_.set_max_difference(l, max_difference, lower_limit);
auto data = data_; //keep alive
std::unique_lock<mutex_type> l(data->mtx_);
data->sem_.set_max_difference(l, max_difference, lower_limit);
}

/// \brief Wait for the semaphore to be signaled
Expand All @@ -82,8 +91,9 @@ namespace hpx {
/// set by signal() is larger than the max_difference.
void wait(std::int64_t upper_limit)
{
std::unique_lock<mutex_type> l(mtx_);
sem_.wait(l, upper_limit);
auto data = data_; //keep alive
std::unique_lock<mutex_type> l(data->mtx_);
data->sem_.wait(l, upper_limit);
}

/// \brief Try to wait for the semaphore to be signaled
Expand All @@ -97,8 +107,9 @@ namespace hpx {
/// would not block if it was calling wait().
bool try_wait(std::int64_t upper_limit = 1)
{
std::unique_lock<mutex_type> l(mtx_);
return sem_.try_wait(l, upper_limit);
auto data = data_; //keep alive
std::unique_lock<mutex_type> l(data->mtx_);
return data->sem_.try_wait(l, upper_limit);
}

/// \brief Signal the semaphore
Expand All @@ -110,19 +121,20 @@ namespace hpx {
/// limit plus the max_difference.
void signal(std::int64_t lower_limit)
{
std::unique_lock<mutex_type> l(mtx_);
sem_.signal(HPX_MOVE(l), lower_limit);
auto data = data_; //keep alive
std::unique_lock<mutex_type> l(data->mtx_);
data->sem_.signal(HPX_MOVE(l), lower_limit);
}

std::int64_t signal_all()
{
std::unique_lock<mutex_type> l(mtx_);
return sem_.signal_all(HPX_MOVE(l));
auto data = data_; //keep alive
std::unique_lock<mutex_type> l(data->mtx_);
return data->sem_.signal_all(HPX_MOVE(l));
}

private:
mutable mutex_type mtx_;
lcos::local::detail::sliding_semaphore sem_;
hpx::intrusive_ptr<data_type> data_;
};

using sliding_semaphore = sliding_semaphore_var<>;
Expand Down

0 comments on commit 9e45942

Please sign in to comment.