diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 17bf5b909..d752722ce 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -119,12 +119,16 @@ jobs: run: ./bin/inv_wrapper.sh dev.cc faabric_tests - name: "Run tests" run: ./bin/inv_wrapper.sh tests + timeout-minutes: 10 dist-tests: if: github.event.pull_request.draft == false needs: [conan-cache] - runs-on: ubuntu-latest + runs-on: self-hosted env: + # Make a unique per-job cluster name, so that different instances can + # run in parallel + COMPOSE_PROJECT_NAME: faabric-gha-${{ github.job }}-${{ github.run_id }}-${{ github.run_attempt }} CONAN_CACHE_MOUNT_SOURCE: ~/.conan/ steps: # --- Code update --- @@ -136,9 +140,13 @@ jobs: run: ./dist-test/build.sh - name: "Run the distributed tests" run: ./dist-test/run.sh + timeout-minutes: 10 - name: "Print planner logs" if: always() run: docker compose logs planner + - name: "Chown all files to avoid docker-related root-owned files" + if: always() + run: sudo chown -R $(id -u):$(id -g) . examples: if: github.event.pull_request.draft == false diff --git a/include/faabric/mpi/MpiWorld.h b/include/faabric/mpi/MpiWorld.h index 8f9cb918c..97fb24f18 100644 --- a/include/faabric/mpi/MpiWorld.h +++ b/include/faabric/mpi/MpiWorld.h @@ -28,7 +28,7 @@ namespace faabric::mpi { // as the broker already has mocking capabilities std::vector getMpiMockedMessages(int sendRank); -typedef faabric::util::FixedCapacityQueue InMemoryMpiQueue; +typedef faabric::util::SpinLockQueue InMemoryMpiQueue; class MpiWorld { @@ -184,8 +184,6 @@ class MpiWorld std::shared_ptr getLocalQueue(int sendRank, int recvRank); - long getLocalQueueSize(int sendRank, int recvRank); - void overrideHost(const std::string& newHost); double getWTime(); diff --git a/include/faabric/util/queue.h b/include/faabric/util/queue.h index 6d89aab18..9f9e2f164 100644 --- a/include/faabric/util/queue.h +++ b/include/faabric/util/queue.h @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -215,6 +216,48 @@ class FixedCapacityQueue moodycamel::BlockingReaderWriterCircularBuffer mq; }; +// High-performance, spin-lock single-producer, single-consumer queue. This +// queue spin-locks, so use at your own risk! +template +class SpinLockQueue +{ + public: + void enqueue(T& value, long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) + { + while (!mq.push(value)) { + ; + }; + } + + T dequeue(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) + { + T value; + + while (!mq.pop(value)) { + ; + } + + return value; + } + + long size() + { + throw std::runtime_error("Size for fast queue unimplemented!"); + } + + void drain() + { + while (mq.pop()) { + ; + } + } + + void reset() { ; } + + private: + boost::lockfree::spsc_queue> mq; +}; + class TokenPool { public: diff --git a/src/mpi/MpiWorld.cpp b/src/mpi/MpiWorld.cpp index 9839d7f96..cc8705dfc 100644 --- a/src/mpi/MpiWorld.cpp +++ b/src/mpi/MpiWorld.cpp @@ -1568,13 +1568,6 @@ int MpiWorld::getIndexForRanks(int sendRank, int recvRank) const return index; } -long MpiWorld::getLocalQueueSize(int sendRank, int recvRank) -{ - const std::shared_ptr& queue = - getLocalQueue(sendRank, recvRank); - return queue->size(); -} - double MpiWorld::getWTime() { double t = faabric::util::getTimeDiffMillis(creationTime); diff --git a/tests/test/mpi/test_multiple_mpi_worlds.cpp b/tests/test/mpi/test_multiple_mpi_worlds.cpp index a6e74f0b5..a1062e81b 100644 --- a/tests/test/mpi/test_multiple_mpi_worlds.cpp +++ b/tests/test/mpi/test_multiple_mpi_worlds.cpp @@ -155,29 +155,6 @@ TEST_CASE_METHOD(MultiWorldMpiTestFixture, worldB.send( rankA1, rankA2, BYTES(messageData.data()), MPI_INT, messageData.size()); - SECTION("Test queueing") - { - // Check for world A - REQUIRE(worldA.getLocalQueueSize(rankA1, rankA2) == 1); - REQUIRE(worldA.getLocalQueueSize(rankA2, rankA1) == 0); - REQUIRE(worldA.getLocalQueueSize(rankA1, 0) == 0); - REQUIRE(worldA.getLocalQueueSize(rankA2, 0) == 0); - const std::shared_ptr& queueA2 = - worldA.getLocalQueue(rankA1, rankA2); - MpiMessage actualMessage = queueA2->dequeue(); - // checkMessage(actualMessage, worldId, rankA1, rankA2, messageData); - - // Check for world B - REQUIRE(worldB.getLocalQueueSize(rankA1, rankA2) == 1); - REQUIRE(worldB.getLocalQueueSize(rankA2, rankA1) == 0); - REQUIRE(worldB.getLocalQueueSize(rankA1, 0) == 0); - REQUIRE(worldB.getLocalQueueSize(rankA2, 0) == 0); - const std::shared_ptr& queueA2B = - worldB.getLocalQueue(rankA1, rankA2); - actualMessage = queueA2B->dequeue(); - // checkMessage(actualMessage, worldId, rankA1, rankA2, messageData); - } - SECTION("Test recv") { MPI_Status status{};