Skip to content

Commit

Permalink
Merge branch 'spinlock' into spinlock-plus-struct-test
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz committed Feb 28, 2024
2 parents 4c25119 + 7b284a2 commit d260e99
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 39 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ jobs:
if: github.event.pull_request.draft == false
needs: [conan-cache]
runs-on: ubuntu-latest
timeout-minutes: 20
strategy:
fail-fast: false
matrix:
Expand Down Expand Up @@ -139,6 +140,7 @@ jobs:
if: github.event.pull_request.draft == false
needs: [conan-cache]
runs-on: ubuntu-latest
timeout-minutes: 20
env:
CONAN_CACHE_MOUNT_SOURCE: ~/.conan/
steps:
Expand Down
2 changes: 0 additions & 2 deletions include/faabric/mpi/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,6 @@ class MpiWorld

std::shared_ptr<InMemoryMpiQueue> getLocalQueue(int sendRank, int recvRank);

long getLocalQueueSize(int sendRank, int recvRank);

void overrideHost(const std::string& newHost);

double getWTime();
Expand Down
24 changes: 17 additions & 7 deletions include/faabric/util/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,24 +222,34 @@ template<typename T>
class SpinLockQueue
{
public:
void enqueue(T& value, long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) {
while (!mq.push(value)) { ; };
void enqueue(T& value, long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
{
while (!mq.push(value)) {
;
};
}

T dequeue(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) {
T dequeue(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS)
{
T value;

while (!mq.pop(value)) { ; }
while (!mq.pop(value)) {
;
}

return value;
}

long size() {
long size()
{
throw std::runtime_error("Size for fast queue unimplemented!");
}

void drain() {
while (mq.pop()) { ; }
void drain()
{
while (mq.pop()) {
;
}
}

void reset() { ; }
Expand Down
7 changes: 0 additions & 7 deletions src/mpi/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1562,13 +1562,6 @@ int MpiWorld::getIndexForRanks(int sendRank, int recvRank) const
return index;
}

long MpiWorld::getLocalQueueSize(int sendRank, int recvRank)
{
const std::shared_ptr<InMemoryMpiQueue>& queue =
getLocalQueue(sendRank, recvRank);
return queue->size();
}

double MpiWorld::getWTime()
{
double t = faabric::util::getTimeDiffMillis(creationTime);
Expand Down
23 changes: 0 additions & 23 deletions tests/test/mpi/test_multiple_mpi_worlds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,29 +155,6 @@ TEST_CASE_METHOD(MultiWorldMpiTestFixture,
worldB.send(
rankA1, rankA2, BYTES(messageData.data()), MPI_INT, messageData.size());

SECTION("Test queueing")
{
// Check for world A
REQUIRE(worldA.getLocalQueueSize(rankA1, rankA2) == 1);
REQUIRE(worldA.getLocalQueueSize(rankA2, rankA1) == 0);
REQUIRE(worldA.getLocalQueueSize(rankA1, 0) == 0);
REQUIRE(worldA.getLocalQueueSize(rankA2, 0) == 0);
const std::shared_ptr<InMemoryMpiQueue>& queueA2 =
worldA.getLocalQueue(rankA1, rankA2);
MpiMessage actualMessage = queueA2->dequeue();
// checkMessage(actualMessage, worldId, rankA1, rankA2, messageData);

// Check for world B
REQUIRE(worldB.getLocalQueueSize(rankA1, rankA2) == 1);
REQUIRE(worldB.getLocalQueueSize(rankA2, rankA1) == 0);
REQUIRE(worldB.getLocalQueueSize(rankA1, 0) == 0);
REQUIRE(worldB.getLocalQueueSize(rankA2, 0) == 0);
const std::shared_ptr<InMemoryMpiQueue>& queueA2B =
worldB.getLocalQueue(rankA1, rankA2);
actualMessage = queueA2B->dequeue();
// checkMessage(actualMessage, worldId, rankA1, rankA2, messageData);
}

SECTION("Test recv")
{
MPI_Status status{};
Expand Down

0 comments on commit d260e99

Please sign in to comment.