Skip to content

Commit

Permalink
migration: fix re-entrant bug (#431)
Browse files Browse the repository at this point in the history
This PR fixes a rare race condition that only happened in enviornments
where the same app is migrated many times.

In particular, this bug only appeared when the same application migrated
away from one host, and then migrated back into it. Migrating into a new
host (wrt the previous scheduling decision) requires one of the
migrated-to ranks to run the world initialisation to set the
local-remote leaders and in-memory queues. However, the second migration
above was not triggering the "new world" migration procedure because the
world had lingered in the per-node registry.

This bug materialised in applications having an old version of the
host-port mappings, and failing to start.

The fix involves knowing when we are evicting a host for a given world
id, and clearing it from the registry if so.
  • Loading branch information
csegarragonz authored Apr 21, 2024
1 parent 7304a61 commit aef8f6e
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 9 deletions.
6 changes: 5 additions & 1 deletion include/faabric/mpi/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ class MpiWorld

int getSize() const;

void destroy();
// Returns true if the world is empty in this host and can be cleared from
// the registry
bool destroy();

void getCartesianRank(int rank,
int maxDims,
Expand Down Expand Up @@ -210,6 +212,8 @@ class MpiWorld
std::string thisHost;
faabric::util::TimePoint creationTime;

std::atomic<int> activeLocalRanks = 0;

std::atomic_flag isDestroyed = false;

std::string user;
Expand Down
2 changes: 2 additions & 0 deletions include/faabric/mpi/MpiWorldRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class MpiWorldRegistry

bool worldExists(int worldId);

void clearWorld(int worldId);

void clear();

private:
Expand Down
28 changes: 26 additions & 2 deletions src/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,19 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx)
if (msg.ismpi()) {
auto& mpiWorldRegistry = faabric::mpi::getMpiWorldRegistry();
if (mpiWorldRegistry.worldExists(msg.mpiworldid())) {
mpiWorldRegistry.getWorld(msg.mpiworldid()).destroy();
bool mustClear =
mpiWorldRegistry.getWorld(msg.mpiworldid()).destroy();

if (mustClear) {
SPDLOG_DEBUG("{}:{}:{} clearing world {} from host {}",
msg.appid(),
msg.groupid(),
msg.groupidx(),
msg.mpiworldid(),
msg.executedhost());

mpiWorldRegistry.clearWorld(msg.mpiworldid());
}
}
}
} catch (const std::exception& ex) {
Expand All @@ -414,7 +426,19 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx)
if (msg.ismpi()) {
auto& mpiWorldRegistry = faabric::mpi::getMpiWorldRegistry();
if (mpiWorldRegistry.worldExists(msg.mpiworldid())) {
mpiWorldRegistry.getWorld(msg.mpiworldid()).destroy();
bool mustClear =
mpiWorldRegistry.getWorld(msg.mpiworldid()).destroy();

if (mustClear) {
SPDLOG_DEBUG("{}:{}:{} clearing world {} from host {}",
msg.appid(),
msg.groupid(),
msg.groupidx(),
msg.mpiworldid(),
msg.executedhost());

mpiWorldRegistry.clearWorld(msg.mpiworldid());
}
}
}
}
Expand Down
32 changes: 27 additions & 5 deletions src/mpi/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@ struct MpiRankState
recvSocket.reset();
recvConnPool.clear();

#ifdef FAABRIC_USE_SPINLOCK
// Free the pinned-to CPU
pinnedCpu.reset();
#endif

// Local message count
msgCount = 1;
Expand Down Expand Up @@ -223,7 +225,7 @@ void MpiWorld::create(faabric::Message& call, int newId, int newSize)
initLocalQueues();
}

void MpiWorld::destroy()
bool MpiWorld::destroy()
{
if (rankState.msg != nullptr) {
SPDLOG_TRACE("{}:{}:{} destroying MPI world",
Expand All @@ -243,6 +245,13 @@ void MpiWorld::destroy()
mpiMockedMessages.clear();
}
#endif

// ----- Global accounting -----

int numActiveLocalRanks =
activeLocalRanks.fetch_sub(1, std::memory_order_acquire);

return numActiveLocalRanks == 1;
}

// Initialise shared (per-host) MPI world state. This method is called once
Expand All @@ -267,6 +276,7 @@ void MpiWorld::initialiseFromMsg(faabric::Message& msg)
void MpiWorld::initialiseRankFromMsg(faabric::Message& msg)
{
rankState.msg = &msg;
activeLocalRanks++;

// Pin this thread to a free CPU
#ifdef FAABRIC_USE_SPINLOCK
Expand Down Expand Up @@ -321,7 +331,7 @@ void MpiWorld::initLocalRemoteLeaders()
size);
throw std::runtime_error("MPI Group-World size mismatch!");
}
assert(rankIds.size() == size);

hostForRank.resize(size);
portForRank.resize(size);
for (const auto& rankId : rankIds) {
Expand Down Expand Up @@ -1797,11 +1807,12 @@ void MpiWorld::initSendRecvSockets()
try {
rankState.recvSocket->listen();
} catch (std::exception& e) {
SPDLOG_ERROR("{}:{}:{} Error binding recv socket! (this host: {})",
SPDLOG_ERROR("{}:{}:{} Error binding recv socket! (host: {}:{})",
rankState.msg->appid(),
rankState.msg->groupid(),
rankState.msg->groupidx(),
thisHost);
thisHost,
thisPort);
throw e;
}

Expand Down Expand Up @@ -1829,7 +1840,18 @@ void MpiWorld::initSendRecvSockets()
rankState.sendSockets.at(otherRank) =
std::make_unique<faabric::transport::tcp::SendSocket>(otherHost,
otherPort);
rankState.sendSockets.at(otherRank)->dial();
try {
rankState.sendSockets.at(otherRank)->dial();
} catch (std::exception& e) {
SPDLOG_ERROR(
"{}:{}:{} Error connecting with send socket to host: {}:{}",
rankState.msg->appid(),
rankState.msg->groupid(),
rankState.msg->groupidx(),
otherHost,
otherPort);
throw e;
}
// Right after connecting, we send our rank over the raw socket to
// identify ourselves
rankState.sendSockets.at(otherRank)->sendOne((const uint8_t*)&thisRank,
Expand Down
5 changes: 5 additions & 0 deletions src/mpi/MpiWorldRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ MpiWorld& MpiWorldRegistry::getWorld(int worldId)
return *world;
}

void MpiWorldRegistry::clearWorld(int worldId)
{
worldMap.erase(worldId);
}

bool MpiWorldRegistry::worldExists(int worldId)
{
return worldMap.contains(worldId);
Expand Down
15 changes: 14 additions & 1 deletion tests/dist/mpi/mpi_native.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,21 @@ static void notImplemented(const std::string& funcName)

int terminateMpi()
{
auto* msg = &faabric::executor::ExecutorContext::get()->getMsg();

// Destroy the MPI world
getExecutingWorld().destroy();
bool mustClear = getExecutingWorld().destroy();

if (mustClear) {
SPDLOG_DEBUG("{}:{}:{} clearing world {} from host {}",
msg->appid(),
msg->groupid(),
msg->groupidx(),
msg->mpiworldid(),
msg->executedhost());

getMpiWorldRegistry().clearWorld(msg->mpiworldid());
}

return MPI_SUCCESS;
}
Expand Down

0 comments on commit aef8f6e

Please sign in to comment.