Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
mpi: prototype MpiMessage struct
Browse files Browse the repository at this point in the history
csegarragonz committed Feb 27, 2024
1 parent c260d18 commit efa9c8a
Showing 6 changed files with 137 additions and 155 deletions.
41 changes: 41 additions & 0 deletions include/faabric/mpi/MpiMessage.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once

#include <cstdint>
#include <vector>

namespace faabric::mpi {

enum MpiMessageType: int32_t {
NORMAL = 0,
BARRIER_JOIN = 1,
BARRIER_DONE = 2,
SCATTER = 3,
GATHER = 4,
ALLGATHER = 5,
REDUCE = 6,
SCAN = 7,
ALLREDUCE = 8,
ALLTOALL = 9,
SENDRECV = 10,
BROADCAST = 11,
};

struct MpiMessage {
int32_t id;
int32_t worldId;
int32_t sendRank;
int32_t recvRank;
int32_t typeSize;
int32_t count;
MpiMessageType messageType;
void* buffer;
};

inline size_t payloadSize(const MpiMessage& msg) { return msg.typeSize * msg.count; }

inline size_t msgSize(const MpiMessage& msg) { return sizeof(MpiMessage) + payloadSize(msg); }

void serializeMpiMsg(std::vector<uint8_t>& buffer, const MpiMessage& msg);

void parseMpiMsg(const std::vector<uint8_t>& bytes, MpiMessage* msg);
}
9 changes: 5 additions & 4 deletions include/faabric/mpi/MpiMessageBuffer.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#include <faabric/mpi/mpi.h>
#include <faabric/mpi/mpi.pb.h>
#include <faabric/mpi/MpiMessage.h>

#include <iterator>
#include <list>
#include <memory>

namespace faabric::mpi {
/* The MPI message buffer (MMB) keeps track of the asyncrhonous
@@ -25,17 +26,17 @@ class MpiMessageBuffer
{
public:
int requestId = -1;
std::shared_ptr<MPIMessage> msg = nullptr;
std::shared_ptr<MpiMessage> msg = nullptr;
int sendRank = -1;
int recvRank = -1;
uint8_t* buffer = nullptr;
faabric_datatype_t* dataType = nullptr;
int count = -1;
MPIMessage::MPIMessageType messageType = MPIMessage::NORMAL;
MpiMessageType messageType = MpiMessageType::NORMAL;

bool isAcknowledged() { return msg != nullptr; }

void acknowledge(std::shared_ptr<MPIMessage> msgIn) { msg = msgIn; }
void acknowledge(const MpiMessage& msgIn) { msg = std::make_shared<MpiMessage>(msgIn); }
};

/* Interface to query the buffer size */
39 changes: 14 additions & 25 deletions include/faabric/mpi/MpiWorld.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#pragma once

#include <faabric/mpi/MpiMessage.h>
#include <faabric/mpi/MpiMessageBuffer.h>
#include <faabric/mpi/mpi.h>
#include <faabric/mpi/mpi.pb.h>
#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/InMemoryMessageQueue.h>
#include <faabric/transport/PointToPointBroker.h>
@@ -21,25 +21,14 @@

namespace faabric::mpi {

struct MpiMessage {
int32_t id;
int32_t worldId;
int32_t sendRank;
int32_t recvRank;
int32_t typeSize;
int32_t count;
void* buffer;
};

// -----------------------------------
// Mocking
// -----------------------------------
// MPITOPTP - mocking at the MPI level won't be needed when using the PTP broker
// as the broker already has mocking capabilities
std::vector<std::shared_ptr<MPIMessage>> getMpiMockedMessages(int sendRank);
std::vector<MpiMessage> getMpiMockedMessages(int sendRank);

typedef faabric::util::FixedCapacityQueue<std::unique_ptr<MpiMessage>>
InMemoryMpiQueue;
typedef faabric::util::FixedCapacityQueue<MpiMessage> InMemoryMpiQueue;

class MpiWorld
{
@@ -83,36 +72,36 @@ class MpiWorld
const uint8_t* buffer,
faabric_datatype_t* dataType,
int count,
MPIMessage::MPIMessageType messageType = MPIMessage::NORMAL);
MpiMessageType messageType = MpiMessageType::NORMAL);

int isend(int sendRank,
int recvRank,
const uint8_t* buffer,
faabric_datatype_t* dataType,
int count,
MPIMessage::MPIMessageType messageType = MPIMessage::NORMAL);
MpiMessageType messageType = MpiMessageType::NORMAL);

void broadcast(int rootRank,
int thisRank,
uint8_t* buffer,
faabric_datatype_t* dataType,
int count,
MPIMessage::MPIMessageType messageType = MPIMessage::NORMAL);
MpiMessageType messageType = MpiMessageType::NORMAL);

void recv(int sendRank,
int recvRank,
uint8_t* buffer,
faabric_datatype_t* dataType,
int count,
MPI_Status* status,
MPIMessage::MPIMessageType messageType = MPIMessage::NORMAL);
MpiMessageType messageType = MpiMessageType::NORMAL);

int irecv(int sendRank,
int recvRank,
uint8_t* buffer,
faabric_datatype_t* dataType,
int count,
MPIMessage::MPIMessageType messageType = MPIMessage::NORMAL);
MpiMessageType messageType = MpiMessageType::NORMAL);

void awaitAsyncRequest(int requestId);

@@ -250,16 +239,16 @@ class MpiWorld
void sendRemoteMpiMessage(std::string dstHost,
int sendRank,
int recvRank,
std::unique_ptr<MpiMessage> msg);
const MpiMessage& msg);

std::shared_ptr<MPIMessage> recvRemoteMpiMessage(int sendRank,
MpiMessage recvRemoteMpiMessage(int sendRank,
int recvRank);

// Support for asyncrhonous communications
std::shared_ptr<MpiMessageBuffer> getUnackedMessageBuffer(int sendRank,
int recvRank);

std::unique_ptr<MPIMessage> recvBatchReturnLast(int sendRank,
MpiMessage recvBatchReturnLast(int sendRank,
int recvRank,
int batchSize = 0);

@@ -268,19 +257,19 @@ class MpiWorld
void checkRanksRange(int sendRank, int recvRank);

// Abstraction of the bulk of the recv work, shared among various functions
void doRecv(std::shared_ptr<MPIMessage>& m,
void doRecv(const MpiMessage& m,
uint8_t* buffer,
faabric_datatype_t* dataType,
int count,
MPI_Status* status,
MPIMessage::MPIMessageType messageType = MPIMessage::NORMAL);
MpiMessageType messageType = MpiMessageType::NORMAL);

// Abstraction of the bulk of the recv work, shared among various functions
void doRecv(std::unique_ptr<MpiMessage> m,
uint8_t* buffer,
faabric_datatype_t* dataType,
int count,
MPI_Status* status,
MPIMessage::MPIMessageType messageType = MPIMessage::NORMAL);
MpiMessageType messageType = MpiMessageType::NORMAL);
};
}
1 change: 1 addition & 0 deletions src/mpi/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -60,6 +60,7 @@ if (NOT ("${CMAKE_PROJECT_NAME}" STREQUAL "faabricmpi"))

faabric_lib(mpi
MpiContext.cpp
MpiMessage.cpp
MpiMessageBuffer.cpp
MpiWorld.cpp
MpiWorldRegistry.cpp
13 changes: 13 additions & 0 deletions src/mpi/MpiMessage.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#include <faabric/mpi/MpiMessage.h>

#include <cstdint>
#include <cstring>

namespace faabric::mpi {

void serialize(std::vector<uint8_t>& buffer, const MpiMessage& msg)
{
std::memcpy(buffer.data(), &msg, sizeof(MpiMessage));
std::memcpy(buffer.data() + sizeof(MpiMessage), msg.buffer, payloadSize(msg));
}
}
189 changes: 63 additions & 126 deletions src/mpi/MpiWorld.cpp

Large diffs are not rendered by default.

0 comments on commit efa9c8a

Please sign in to comment.