Skip to content

Commit

Permalink
mpi: #385
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz committed Mar 13, 2024
1 parent ba0d691 commit 71d3f79
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 34 deletions.
10 changes: 9 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,16 @@ jobs:
run: ./bin/inv_wrapper.sh dev.cc faabric_tests
- name: "Run tests"
run: ./bin/inv_wrapper.sh tests
timeout-minutes: 10

dist-tests:
if: github.event.pull_request.draft == false
needs: [conan-cache]
runs-on: ubuntu-latest
runs-on: self-hosted
env:
# Make a unique per-job cluster name, so that different instances can
# run in parallel
COMPOSE_PROJECT_NAME: faabric-gha-${{ github.job }}-${{ github.run_id }}-${{ github.run_attempt }}
CONAN_CACHE_MOUNT_SOURCE: ~/.conan/
steps:
# --- Code update ---
Expand All @@ -136,9 +140,13 @@ jobs:
run: ./dist-test/build.sh
- name: "Run the distributed tests"
run: ./dist-test/run.sh
timeout-minutes: 10
- name: "Print planner logs"
if: always()
run: docker compose logs planner
- name: "Chown all files to avoid docker-related root-owned files"
if: always()
run: sudo chown -R $(id -u):$(id -g) .

examples:
if: github.event.pull_request.draft == false
Expand Down
4 changes: 1 addition & 3 deletions include/faabric/mpi/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace faabric::mpi {
// as the broker already has mocking capabilities
std::vector<MpiMessage> getMpiMockedMessages(int sendRank);

typedef faabric::util::FixedCapacityQueue<MpiMessage> InMemoryMpiQueue;
typedef faabric::util::SpinLockQueue<MpiMessage> InMemoryMpiQueue;

class MpiWorld
{
Expand Down Expand Up @@ -184,8 +184,6 @@ class MpiWorld

std::shared_ptr<InMemoryMpiQueue> getLocalQueue(int sendRank, int recvRank);

long getLocalQueueSize(int sendRank, int recvRank);

void overrideHost(const std::string& newHost);

double getWTime();
Expand Down
43 changes: 43 additions & 0 deletions include/faabric/util/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <faabric/util/locks.h>
#include <faabric/util/logging.h>

#include <boost/lockfree/spsc_queue.hpp>
#include <condition_variable>
#include <queue>
#include <readerwriterqueue/readerwritercircularbuffer.h>
Expand Down Expand Up @@ -215,6 +216,48 @@ class FixedCapacityQueue
moodycamel::BlockingReaderWriterCircularBuffer<T> mq;
};

// High-performance, spin-lock single-producer, single-consumer queue. This
// queue spin-locks, so use at your own risk!
template<typename T>
class SpinLockQueue
{
public:
void enqueue(T& value, long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
{
while (!mq.push(value)) {
;
};
}

T dequeue(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
{
T value;

while (!mq.pop(value)) {
;
}

return value;
}

long size()
{
throw std::runtime_error("Size for fast queue unimplemented!");
}

void drain()
{
while (mq.pop()) {
;
}
}

void reset() { ; }

private:
boost::lockfree::spsc_queue<T, boost::lockfree::capacity<1024>> mq;
};

class TokenPool
{
public:
Expand Down
7 changes: 0 additions & 7 deletions src/mpi/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1568,13 +1568,6 @@ int MpiWorld::getIndexForRanks(int sendRank, int recvRank) const
return index;
}

long MpiWorld::getLocalQueueSize(int sendRank, int recvRank)
{
const std::shared_ptr<InMemoryMpiQueue>& queue =
getLocalQueue(sendRank, recvRank);
return queue->size();
}

double MpiWorld::getWTime()
{
double t = faabric::util::getTimeDiffMillis(creationTime);
Expand Down
23 changes: 0 additions & 23 deletions tests/test/mpi/test_multiple_mpi_worlds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,29 +155,6 @@ TEST_CASE_METHOD(MultiWorldMpiTestFixture,
worldB.send(
rankA1, rankA2, BYTES(messageData.data()), MPI_INT, messageData.size());

SECTION("Test queueing")
{
// Check for world A
REQUIRE(worldA.getLocalQueueSize(rankA1, rankA2) == 1);
REQUIRE(worldA.getLocalQueueSize(rankA2, rankA1) == 0);
REQUIRE(worldA.getLocalQueueSize(rankA1, 0) == 0);
REQUIRE(worldA.getLocalQueueSize(rankA2, 0) == 0);
const std::shared_ptr<InMemoryMpiQueue>& queueA2 =
worldA.getLocalQueue(rankA1, rankA2);
MpiMessage actualMessage = queueA2->dequeue();
// checkMessage(actualMessage, worldId, rankA1, rankA2, messageData);

// Check for world B
REQUIRE(worldB.getLocalQueueSize(rankA1, rankA2) == 1);
REQUIRE(worldB.getLocalQueueSize(rankA2, rankA1) == 0);
REQUIRE(worldB.getLocalQueueSize(rankA1, 0) == 0);
REQUIRE(worldB.getLocalQueueSize(rankA2, 0) == 0);
const std::shared_ptr<InMemoryMpiQueue>& queueA2B =
worldB.getLocalQueue(rankA1, rankA2);
actualMessage = queueA2B->dequeue();
// checkMessage(actualMessage, worldId, rankA1, rankA2, messageData);
}

SECTION("Test recv")
{
MPI_Status status{};
Expand Down

0 comments on commit 71d3f79

Please sign in to comment.