Skip to content

Commit

Permalink
queue: make abstract base class for faabric's queues and switch to si…
Browse files Browse the repository at this point in the history
…mpler lock-based queue
  • Loading branch information
csegarragonz committed Feb 23, 2024
1 parent ae55df8 commit f54ae94
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 12 deletions.
3 changes: 1 addition & 2 deletions include/faabric/mpi/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ struct MpiMessage {
// as the broker already has mocking capabilities
std::vector<std::shared_ptr<MPIMessage>> getMpiMockedMessages(int sendRank);

typedef faabric::util::FixedCapacityQueue<std::unique_ptr<MpiMessage>>
InMemoryMpiQueue;
typedef faabric::util::Queue<std::unique_ptr<MpiMessage>> InMemoryMpiQueue;

class MpiWorld
{
Expand Down
32 changes: 22 additions & 10 deletions include/faabric/util/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,22 @@ class QueueTimeoutException : public faabric::util::FaabricException
};

template<typename T>
class Queue
class BaseQueue
{
virtual void enqueue(T value) = 0;

virtual T dequeue(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) = 0;

virtual void drain() = 0;

virtual void reset() = 0;
};

template<typename T>
class Queue : public BaseQueue<T>
{
public:
void enqueue(T value)
void enqueue(T value) override
{
UniqueLock lock(mx);

Expand All @@ -46,7 +58,7 @@ class Queue
}
}

T dequeue(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
T dequeue(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) override
{
UniqueLock lock(mx);

Expand Down Expand Up @@ -110,7 +122,7 @@ class Queue
}
}

void drain()
void drain() override
{
UniqueLock lock(mx);

Expand All @@ -125,7 +137,7 @@ class Queue
return mq.size();
}

void reset()
void reset() override
{
UniqueLock lock(mx);

Expand All @@ -144,7 +156,7 @@ class Queue
// consumer queue
// https://github.com/cameron314/readerwriterqueue
template<typename T>
class FixedCapacityQueue
class FixedCapacityQueue : public BaseQueue<T>
{
public:
FixedCapacityQueue(int capacity)
Expand All @@ -153,7 +165,7 @@ class FixedCapacityQueue
FixedCapacityQueue()
: mq(DEFAULT_QUEUE_SIZE){};

void enqueue(T value, long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
void enqueue(T value, long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) override
{
if (timeoutMs <= 0) {
SPDLOG_ERROR("Invalid queue timeout: {} <= 0", timeoutMs);
Expand All @@ -169,7 +181,7 @@ class FixedCapacityQueue

void dequeueIfPresent(T* res) { mq.try_dequeue(*res); }

T dequeue(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
T dequeue(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) override
{
if (timeoutMs <= 0) {
SPDLOG_ERROR("Invalid queue timeout: {} <= 0", timeoutMs);
Expand All @@ -190,7 +202,7 @@ class FixedCapacityQueue
throw std::runtime_error("Peek not implemented");
}

void drain(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
void drain(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) override
{
T value;
bool success;
Expand All @@ -204,7 +216,7 @@ class FixedCapacityQueue

long size() { return mq.size_approx(); }

void reset()
void reset() override
{
moodycamel::BlockingReaderWriterCircularBuffer<T> empty(
mq.max_capacity());
Expand Down

0 comments on commit f54ae94

Please sign in to comment.