Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner(mpi): preload MPI SCALE_CHANGE decisions #422

Merged
merged 2 commits into from
Apr 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FAABRIC_VERSION=0.16.0
FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.16.0
FAABRIC_VERSION=0.17.0
FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.17.0
COMPOSE_PROJECT_NAME=faabric-dev
CONAN_CACHE_MOUNT_SOURCE=./conan-cache/
12 changes: 6 additions & 6 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.16.0
image: faasm.azurecr.io/faabric:0.17.0
env:
DEPLOYMENT_TYPE: gha-ci
steps:
Expand All @@ -34,7 +34,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.16.0
image: faasm.azurecr.io/faabric:0.17.0
steps:
- name: "Check out code"
uses: actions/checkout@v4
Expand All @@ -45,7 +45,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.16.0
image: faasm.azurecr.io/faabric:0.17.0
steps:
- name: "Check out code"
uses: actions/checkout@v4
Expand All @@ -65,7 +65,7 @@ jobs:
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
image: faasm.azurecr.io/faabric:0.16.0
image: faasm.azurecr.io/faabric:0.17.0
options: --privileged
services:
redis:
Expand Down Expand Up @@ -104,7 +104,7 @@ jobs:
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
image: faasm.azurecr.io/faabric:0.16.0
image: faasm.azurecr.io/faabric:0.17.0
options: --privileged
services:
redis:
Expand Down Expand Up @@ -156,7 +156,7 @@ jobs:
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
image: faasm.azurecr.io/faabric:0.16.0
image: faasm.azurecr.io/faabric:0.17.0
services:
redis:
image: redis
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.16.0
0.17.0
5 changes: 3 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
black==22.3.0
black>=24.4.0
breathe>=4.35.0
flake8>=7.0.0
invoke>=2.0.0
myst_parser>=2.0.0
PyGithub==1.55
PyGithub>=1.55
sphinx>=7.3.6
sphinx-rtd-theme>=2.0.0
61 changes: 58 additions & 3 deletions src/planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

#include <string>

// Special group ID magic to indicate MPI decisions that we have preemptively
// scheduled
#define MPI_PRELOADED_DECISION_GROUPID -99

namespace faabric::planner {

// ----------------------
Expand Down Expand Up @@ -512,6 +516,8 @@ Planner::getPreloadedSchedulingDecision(
msg.id(),
decision->appIdxs.at(idxInDecision),
decision->groupIdxs.at(idxInDecision));
filteredDecision->mpiPorts.at(filteredDecision->nFunctions - 1) =
decision->mpiPorts.at(idxInDecision);
}
assert(filteredDecision->hosts.size() == ber->messages_size());

Expand Down Expand Up @@ -622,12 +628,33 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
}
}

// For a NEW decision of an MPI application, we know that it will be
// followed-up by a SCALE_CHANGE one, and that the mpi_world_size parameter
// must be set. Thus, we can schedule slots for all the MPI ranks, and
// consume them later as a preloaded scheduling decision
bool isNew = decisionType == faabric::batch_scheduler::DecisionType::NEW;
bool isMpi = req->messages(0).ismpi();
std::shared_ptr<BatchExecuteRequest> mpiReq = nullptr;

// Check if there exists a pre-loaded scheduling decision for this app
// (e.g. if we want to force a migration). Note that we don't want to check
// pre-loaded decisions for dist-change requests
std::shared_ptr<batch_scheduler::SchedulingDecision> decision = nullptr;
if (!isDistChange && state.preloadedSchedulingDecisions.contains(appId)) {
decision = getPreloadedSchedulingDecision(appId, req);
} else if (isNew && isMpi) {
mpiReq = faabric::util::batchExecFactory(
req->user(), req->function(), req->messages(0).mpiworldsize());

// Populate the temporary request
mpiReq->mutable_messages()->at(0) = req->messages(0);
faabric::util::updateBatchExecAppId(mpiReq, appId);
for (int i = 0; i < mpiReq->messages_size(); i++) {
mpiReq->mutable_messages()->at(i).set_groupidx(i);
}

decision = batchScheduler->makeSchedulingDecision(
hostMapCopy, state.inFlightReqs, mpiReq);
} else {
decision = batchScheduler->makeSchedulingDecision(
hostMapCopy, state.inFlightReqs, req);
Expand All @@ -649,6 +676,9 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
return decision;
}

// Skip claiming slots and ports if we have preemptively allocated them
bool skipClaim = decision->groupId == MPI_PRELOADED_DECISION_GROUPID;

// A scheduling decision will create a new PTP mapping and, as a
// consequence, a new group ID
int newGroupId = faabric::util::generateGid();
Expand Down Expand Up @@ -676,6 +706,23 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
decision->print();
#endif

// For a NEW MPI decision that was not preloaded we have
// preemptively scheduled all MPI messages but now we just need to
// return the first one, and preload the rest
if (isMpi && mpiReq != nullptr) {
auto mpiDecision = std::make_shared<
faabric::batch_scheduler::SchedulingDecision>(req->appid(),
req->groupid());
*mpiDecision = *decision;
mpiDecision->groupId = MPI_PRELOADED_DECISION_GROUPID;
state.preloadedSchedulingDecisions[appId] = mpiDecision;

// Remove all messages that we do not have to dispatch now
for (int i = 1; i < mpiDecision->messageIds.size(); i++) {
decision->removeMessage(mpiDecision->messageIds.at(i));
}
}

// 2. For a new decision, we just add it to the in-flight map
state.inFlightReqs[appId] = std::make_pair(req, decision);

Expand All @@ -689,7 +736,9 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
// with the _new_ messages being scheduled
for (int i = 0; i < decision->hosts.size(); i++) {
auto thisHost = state.hostMap.at(decision->hosts.at(i));
claimHostSlots(thisHost);
if (!skipClaim) {
claimHostSlots(thisHost);
}
}

// 2. For a scale change request, we want to update the BER with the
Expand All @@ -703,8 +752,14 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
for (int i = 0; i < req->messages_size(); i++) {
*oldReq->add_messages() = req->messages(i);
oldDec->addMessage(decision->hosts.at(i), req->messages(i));
oldDec->mpiPorts.at(oldDec->nFunctions - 1) =
claimHostMpiPort(state.hostMap.at(decision->hosts.at(i)));
if (!skipClaim) {
oldDec->mpiPorts.at(oldDec->nFunctions - 1) =
claimHostMpiPort(state.hostMap.at(decision->hosts.at(i)));
} else {
assert(decision->mpiPorts.at(i) != 0);
oldDec->mpiPorts.at(oldDec->nFunctions - 1) =
decision->mpiPorts.at(i);
}
}

// 2.5.1. Log the updated decision in debug mode
Expand Down
Loading