Skip to content

Commit

Permalink
migration: fix re-entrant bug
Browse files Browse the repository at this point in the history
This PR fixes a rare race condition that only happened in enviornments
where the same app is migrated many times.

In particular, this bug only appeared when the same application migrated
away from one host, and then migrated back into it. Migrating into a new
host (wrt the previous scheduling decision) requires one of the
migrated-to ranks to run the world initialisation to set the
local-remote leaders and in-memory queues. However, the second migration
above was not triggering the "new world" migration procedure because the
world had lingered in the per-node registry.

This bug materialised in applications having an old version of the
host-port mappings, and failing to start.

The fix involves knowing when we are evicting a host for a given world
id, and clearing it from the registry if so.
  • Loading branch information
csegarragonz committed Apr 21, 2024
1 parent 7304a61 commit 0b6d532
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 9 deletions.
6 changes: 5 additions & 1 deletion include/faabric/mpi/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ class MpiWorld

int getSize() const;

void destroy();
// Returns true if the world is empty in this host and can be cleared from
// the registry
bool destroy();

void getCartesianRank(int rank,
int maxDims,
Expand Down Expand Up @@ -210,6 +212,8 @@ class MpiWorld
std::string thisHost;
faabric::util::TimePoint creationTime;

std::atomic<int> activeLocalRanks = 0;

std::atomic_flag isDestroyed = false;

std::string user;
Expand Down
2 changes: 2 additions & 0 deletions include/faabric/mpi/MpiWorldRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class MpiWorldRegistry

bool worldExists(int worldId);

void clearWorld(int worldId);

void clear();

private:
Expand Down
28 changes: 26 additions & 2 deletions src/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,19 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx)
if (msg.ismpi()) {
auto& mpiWorldRegistry = faabric::mpi::getMpiWorldRegistry();
if (mpiWorldRegistry.worldExists(msg.mpiworldid())) {
mpiWorldRegistry.getWorld(msg.mpiworldid()).destroy();
bool mustClear =
mpiWorldRegistry.getWorld(msg.mpiworldid()).destroy();

Check warning on line 403 in src/executor/Executor.cpp

View check run for this annotation

Codecov / codecov/patch

src/executor/Executor.cpp#L402-L403

Added lines #L402 - L403 were not covered by tests

if (mustClear) {
SPDLOG_DEBUG("{}:{}:{} clearing world {} from host {}",
msg.appid(),
msg.groupid(),
msg.groupidx(),
msg.mpiworldid(),
msg.executedhost());

Check warning on line 411 in src/executor/Executor.cpp

View check run for this annotation

Codecov / codecov/patch

src/executor/Executor.cpp#L405-L411

Added lines #L405 - L411 were not covered by tests

mpiWorldRegistry.clearWorld(msg.mpiworldid());

Check warning on line 413 in src/executor/Executor.cpp

View check run for this annotation

Codecov / codecov/patch

src/executor/Executor.cpp#L413

Added line #L413 was not covered by tests
}
}
}
} catch (const std::exception& ex) {
Expand All @@ -414,7 +426,19 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx)
if (msg.ismpi()) {
auto& mpiWorldRegistry = faabric::mpi::getMpiWorldRegistry();
if (mpiWorldRegistry.worldExists(msg.mpiworldid())) {
mpiWorldRegistry.getWorld(msg.mpiworldid()).destroy();
bool mustClear =
mpiWorldRegistry.getWorld(msg.mpiworldid()).destroy();

Check warning on line 430 in src/executor/Executor.cpp

View check run for this annotation

Codecov / codecov/patch

src/executor/Executor.cpp#L429-L430

Added lines #L429 - L430 were not covered by tests

if (mustClear) {
SPDLOG_DEBUG("{}:{}:{} clearing world {} from host {}",
msg.appid(),
msg.groupid(),
msg.groupidx(),
msg.mpiworldid(),
msg.executedhost());

Check warning on line 438 in src/executor/Executor.cpp

View check run for this annotation

Codecov / codecov/patch

src/executor/Executor.cpp#L432-L438

Added lines #L432 - L438 were not covered by tests

mpiWorldRegistry.clearWorld(msg.mpiworldid());

Check warning on line 440 in src/executor/Executor.cpp

View check run for this annotation

Codecov / codecov/patch

src/executor/Executor.cpp#L440

Added line #L440 was not covered by tests
}
}
}
}
Expand Down
32 changes: 27 additions & 5 deletions src/mpi/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@ struct MpiRankState
recvSocket.reset();
recvConnPool.clear();

#ifdef FAABRIC_USE_SPINLOCK
// Free the pinned-to CPU
pinnedCpu.reset();
#endif

// Local message count
msgCount = 1;
Expand Down Expand Up @@ -223,7 +225,7 @@ void MpiWorld::create(faabric::Message& call, int newId, int newSize)
initLocalQueues();
}

void MpiWorld::destroy()
bool MpiWorld::destroy()
{
if (rankState.msg != nullptr) {
SPDLOG_TRACE("{}:{}:{} destroying MPI world",
Expand All @@ -243,6 +245,13 @@ void MpiWorld::destroy()
mpiMockedMessages.clear();
}
#endif

// ----- Global accounting -----

int numActiveLocalRanks =
activeLocalRanks.fetch_sub(1, std::memory_order_acquire);

return numActiveLocalRanks == 1;
}

// Initialise shared (per-host) MPI world state. This method is called once
Expand All @@ -267,6 +276,7 @@ void MpiWorld::initialiseFromMsg(faabric::Message& msg)
void MpiWorld::initialiseRankFromMsg(faabric::Message& msg)
{
rankState.msg = &msg;
activeLocalRanks++;

// Pin this thread to a free CPU
#ifdef FAABRIC_USE_SPINLOCK
Expand Down Expand Up @@ -321,7 +331,7 @@ void MpiWorld::initLocalRemoteLeaders()
size);
throw std::runtime_error("MPI Group-World size mismatch!");
}
assert(rankIds.size() == size);

hostForRank.resize(size);
portForRank.resize(size);
for (const auto& rankId : rankIds) {
Expand Down Expand Up @@ -1797,11 +1807,12 @@ void MpiWorld::initSendRecvSockets()
try {
rankState.recvSocket->listen();
} catch (std::exception& e) {
SPDLOG_ERROR("{}:{}:{} Error binding recv socket! (this host: {})",
SPDLOG_ERROR("{}:{}:{} Error binding recv socket! (host: {}:{})",

Check warning on line 1810 in src/mpi/MpiWorld.cpp

View check run for this annotation

Codecov / codecov/patch

src/mpi/MpiWorld.cpp#L1810

Added line #L1810 was not covered by tests
rankState.msg->appid(),
rankState.msg->groupid(),
rankState.msg->groupidx(),
thisHost);
thisHost,
thisPort);

Check warning on line 1815 in src/mpi/MpiWorld.cpp

View check run for this annotation

Codecov / codecov/patch

src/mpi/MpiWorld.cpp#L1814-L1815

Added lines #L1814 - L1815 were not covered by tests
throw e;
}

Expand Down Expand Up @@ -1829,7 +1840,18 @@ void MpiWorld::initSendRecvSockets()
rankState.sendSockets.at(otherRank) =
std::make_unique<faabric::transport::tcp::SendSocket>(otherHost,
otherPort);
rankState.sendSockets.at(otherRank)->dial();
try {
rankState.sendSockets.at(otherRank)->dial();
} catch (std::exception& e) {
SPDLOG_ERROR(
"{}:{}:{} Error connecting with send socket to host: {}:{}",
rankState.msg->appid(),
rankState.msg->groupid(),
rankState.msg->groupidx(),
otherHost,
otherPort);
throw e;

Check warning on line 1853 in src/mpi/MpiWorld.cpp

View check run for this annotation

Codecov / codecov/patch

src/mpi/MpiWorld.cpp#L1846-L1853

Added lines #L1846 - L1853 were not covered by tests
}
// Right after connecting, we send our rank over the raw socket to
// identify ourselves
rankState.sendSockets.at(otherRank)->sendOne((const uint8_t*)&thisRank,
Expand Down
5 changes: 5 additions & 0 deletions src/mpi/MpiWorldRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ MpiWorld& MpiWorldRegistry::getWorld(int worldId)
return *world;
}

void MpiWorldRegistry::clearWorld(int worldId)
{
worldMap.erase(worldId);

Check warning on line 91 in src/mpi/MpiWorldRegistry.cpp

View check run for this annotation

Codecov / codecov/patch

src/mpi/MpiWorldRegistry.cpp#L91

Added line #L91 was not covered by tests
}

bool MpiWorldRegistry::worldExists(int worldId)
{
return worldMap.contains(worldId);
Expand Down
15 changes: 14 additions & 1 deletion tests/dist/mpi/mpi_native.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,21 @@ static void notImplemented(const std::string& funcName)

int terminateMpi()
{
auto* msg = &faabric::executor::ExecutorContext::get()->getMsg();

// Destroy the MPI world
getExecutingWorld().destroy();
bool mustClear = getExecutingWorld().destroy();

if (mustClear) {
SPDLOG_DEBUG("{}:{}:{} clearing world {} from host {}",
msg->appid(),
msg->groupid(),
msg->groupidx(),
msg->mpiworldid(),
msg->executedhost());

getMpiWorldRegistry().clearWorld(msg->mpiworldid());
}

return MPI_SUCCESS;
}
Expand Down

0 comments on commit 0b6d532

Please sign in to comment.