Skip to content

Commit

Permalink
mpi: use boost's lockfree ringbuffer for local messaging
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz committed Feb 26, 2024
1 parent c9c8895 commit a7bb9f5
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 3 deletions.
2 changes: 1 addition & 1 deletion include/faabric/mpi/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace faabric::mpi {
// as the broker already has mocking capabilities
std::vector<std::shared_ptr<MPIMessage>> getMpiMockedMessages(int sendRank);

typedef faabric::util::FixedCapacityQueue<std::shared_ptr<MPIMessage>>
typedef faabric::util::SpinLockQueue<std::shared_ptr<MPIMessage>>
InMemoryMpiQueue;

class MpiWorld
Expand Down
33 changes: 33 additions & 0 deletions include/faabric/util/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <faabric/util/locks.h>
#include <faabric/util/logging.h>

#include <boost/lockfree/spsc_queue.hpp>
#include <condition_variable>
#include <queue>
#include <readerwriterqueue/readerwritercircularbuffer.h>
Expand Down Expand Up @@ -215,6 +216,38 @@ class FixedCapacityQueue
moodycamel::BlockingReaderWriterCircularBuffer<T> mq;
};

// High-performance, spin-lock single-producer, single-consumer queue. This
// queue spin-locks, so use at your own risk!
template<typename T>
class SpinLockQueue
{
public:
void enqueue(T& value, long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) {
while (!mq.push(value)) { ; };
}

T dequeue(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) {
T value;

while (!mq.pop(value)) { ; }

return value;
}

long size() {
throw std::runtime_error("Size for fast queue unimplemented!");
}

void drain() {
while (mq.pop()) { ; }
}

void reset() { ; }

private:
boost::lockfree::spsc_queue<T, boost::lockfree::capacity<1024>> mq;
};

class TokenPool
{
public:
Expand Down
6 changes: 4 additions & 2 deletions src/mpi/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ void MpiWorld::send(int sendRank,
if (isLocal) {
SPDLOG_TRACE(
"MPI - send {} -> {} ({})", sendRank, recvRank, messageType);
getLocalQueue(sendRank, recvRank)->enqueue(std::move(m));
getLocalQueue(sendRank, recvRank)->enqueue(m);
} else {
SPDLOG_TRACE(
"MPI - send remote {} -> {} ({})", sendRank, recvRank, messageType);
Expand Down Expand Up @@ -1395,15 +1395,17 @@ void MpiWorld::allToAll(int rank,
// queues.
void MpiWorld::probe(int sendRank, int recvRank, MPI_Status* status)
{
throw std::runtime_error("MPI_Probe not supported!");
/*
const std::shared_ptr<InMemoryMpiQueue>& queue =
getLocalQueue(sendRank, recvRank);
// 30/12/21 - Peek will throw a runtime error
std::shared_ptr<MPIMessage> m = *(queue->peek());
faabric_datatype_t* datatype = getFaabricDatatypeFromId(m->type());
status->bytesSize = m->count() * datatype->size;
status->MPI_ERROR = 0;
status->MPI_SOURCE = m->sender();
*/
}

void MpiWorld::barrier(int thisRank)
Expand Down

0 comments on commit a7bb9f5

Please sign in to comment.