diff --git a/include/faabric/mpi/MpiWorld.h b/include/faabric/mpi/MpiWorld.h index adee54137..2402d2e36 100644 --- a/include/faabric/mpi/MpiWorld.h +++ b/include/faabric/mpi/MpiWorld.h @@ -28,7 +28,7 @@ namespace faabric::mpi { // as the broker already has mocking capabilities std::vector> getMpiMockedMessages(int sendRank); -typedef faabric::util::FixedCapacityQueue> +typedef faabric::util::SpinLockQueue> InMemoryMpiQueue; class MpiWorld diff --git a/include/faabric/util/queue.h b/include/faabric/util/queue.h index 6d89aab18..28462a32b 100644 --- a/include/faabric/util/queue.h +++ b/include/faabric/util/queue.h @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -215,6 +216,38 @@ class FixedCapacityQueue moodycamel::BlockingReaderWriterCircularBuffer mq; }; +// High-performance, spin-lock single-producer, single-consumer queue. This +// queue spin-locks, so use at your own risk! +template +class SpinLockQueue +{ + public: + void enqueue(T& value, long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) { + while (!mq.push(value)) { ; }; + } + + T dequeue(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) { + T value; + + while (!mq.pop(value)) { ; } + + return value; + } + + long size() { + throw std::runtime_error("Size for fast queue unimplemented!"); + } + + void drain() { + while (mq.pop()) { ; } + } + + void reset() { ; } + + private: + boost::lockfree::spsc_queue> mq; +}; + class TokenPool { public: diff --git a/src/mpi/MpiWorld.cpp b/src/mpi/MpiWorld.cpp index cda95ed8e..aa335b094 100644 --- a/src/mpi/MpiWorld.cpp +++ b/src/mpi/MpiWorld.cpp @@ -530,7 +530,7 @@ void MpiWorld::send(int sendRank, if (isLocal) { SPDLOG_TRACE( "MPI - send {} -> {} ({})", sendRank, recvRank, messageType); - getLocalQueue(sendRank, recvRank)->enqueue(std::move(m)); + getLocalQueue(sendRank, recvRank)->enqueue(m); } else { SPDLOG_TRACE( "MPI - send remote {} -> {} ({})", sendRank, recvRank, messageType); @@ -1395,15 +1395,17 @@ void MpiWorld::allToAll(int rank, // queues. void MpiWorld::probe(int sendRank, int recvRank, MPI_Status* status) { + throw std::runtime_error("MPI_Probe not supported!"); + /* const std::shared_ptr& queue = getLocalQueue(sendRank, recvRank); - // 30/12/21 - Peek will throw a runtime error std::shared_ptr m = *(queue->peek()); faabric_datatype_t* datatype = getFaabricDatatypeFromId(m->type()); status->bytesSize = m->count() * datatype->size; status->MPI_ERROR = 0; status->MPI_SOURCE = m->sender(); + */ } void MpiWorld::barrier(int thisRank)