Skip to content

Commit

Permalink
Add HTTP endpoint to get in-flight apps (#353)
Browse files Browse the repository at this point in the history
* planner: remove messages from in-flight when setting message result

* scheduler: add method to remove message from scheduling decision

* tests: add unit test

* add method to get batch results and make sure tests pass when we remove things from in-flight

* gh: bump minor version

* planner: clear scheduling state upon reset (and add specific flush method

* tests: fix racing mpi test

* dist-tests: set the ptp server threads to avoid racy deadlocks

* dist-tests: simplify variable setting
  • Loading branch information
csegarragonz authored Sep 7, 2023
1 parent 6af1cda commit e98e917
Show file tree
Hide file tree
Showing 21 changed files with 421 additions and 39 deletions.
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FAABRIC_VERSION=0.7.0
FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.7.0
FAABRIC_VERSION=0.8.0
FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.8.0
COMPOSE_PROJECT_NAME=faabric-dev
CONAN_CACHE_MOUNT_SOURCE=./conan-cache/
12 changes: 6 additions & 6 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.7.0
image: faasm.azurecr.io/faabric:0.8.0
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand All @@ -36,7 +36,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.7.0
image: faasm.azurecr.io/faabric:0.8.0
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand All @@ -50,7 +50,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.7.0
image: faasm.azurecr.io/faabric:0.8.0
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand All @@ -73,7 +73,7 @@ jobs:
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
image: faasm.azurecr.io/faabric:0.7.0
image: faasm.azurecr.io/faabric:0.8.0
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand Down Expand Up @@ -113,7 +113,7 @@ jobs:
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
image: faasm.azurecr.io/faabric:0.7.0
image: faasm.azurecr.io/faabric:0.8.0
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand Down Expand Up @@ -167,7 +167,7 @@ jobs:
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
image: faasm.azurecr.io/faabric:0.7.0
image: faasm.azurecr.io/faabric:0.8.0
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.7.0
0.8.0
4 changes: 4 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ services:
- LOG_LEVEL=${LOG_LEVEL:-debug}
- PLANNER_HOST=planner
- PLANNER_PORT=8080
# See faasm/faabric#335
- POINT_TO_POINT_SERVER_THREADS=8
- REDIS_STATE_HOST=redis
- REDIS_QUEUE_HOST=redis
- OVERRIDE_CPU_COUNT=${OVERRIDE_CPU_COUNT:-0}
Expand All @@ -50,6 +52,8 @@ services:
- LOG_LEVEL=debug
- PLANNER_HOST=planner
- PLANNER_PORT=8080
# See faasm/faabric#335
- POINT_TO_POINT_SERVER_THREADS=8
- REDIS_STATE_HOST=redis
- REDIS_QUEUE_HOST=redis
- OVERRIDE_CPU_COUNT=${OVERRIDE_CPU_COUNT:-0}
Expand Down
1 change: 0 additions & 1 deletion include/faabric/batch-scheduler/BatchScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ typedef std::pair<std::shared_ptr<BatchExecuteRequest>,

typedef std::map<int32_t, InFlightPair> InFlightReqs;

// TODO: remove duplication with PlannerState
struct HostState
{
HostState(const std::string& ipIn, int slotsIn, int usedSlotsIn)
Expand Down
6 changes: 6 additions & 0 deletions include/faabric/planner/Planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ enum FlushType
NoFlushType = 0,
Hosts = 1,
Executors = 2,
SchedulingState = 3,
};

/* The planner is a standalone component that has a global view of the state
Expand Down Expand Up @@ -69,6 +70,9 @@ class Planner
std::shared_ptr<faabric::batch_scheduler::SchedulingDecision>
getSchedulingDecision(std::shared_ptr<BatchExecuteRequest> req);

faabric::batch_scheduler::InFlightReqs getInFlightReqs();

// Main entrypoint to request the execution of batches
std::shared_ptr<faabric::batch_scheduler::SchedulingDecision> callBatch(
std::shared_ptr<BatchExecuteRequest> req);

Expand All @@ -91,6 +95,8 @@ class Planner

void flushExecutors();

void flushSchedulingState();

// ----------
// Host membership private API
// ----------
Expand Down
5 changes: 3 additions & 2 deletions include/faabric/planner/PlannerApi.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ enum PlannerCalls
// Scheduling calls
SetMessageResult = 8,
GetMessageResult = 9,
GetSchedulingDecision = 10,
CallBatch = 11,
GetBatchResults = 10,
GetSchedulingDecision = 11,
CallBatch = 12,
};
}
3 changes: 3 additions & 0 deletions include/faabric/planner/PlannerClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ class PlannerClient final : public faabric::transport::MessageEndpointClient
faabric::Message getMessageResult(const faabric::Message& msg,
int timeoutMs);

std::shared_ptr<faabric::BatchExecuteRequestStatus> getBatchResults(
std::shared_ptr<faabric::BatchExecuteRequest> req);

faabric::batch_scheduler::SchedulingDecision callFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req);

Expand Down
3 changes: 3 additions & 0 deletions include/faabric/planner/PlannerServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class PlannerServer final : public faabric::transport::MessageEndpointServer
std::unique_ptr<google::protobuf::Message> recvGetMessageResult(
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvGetBatchResults(
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvGetSchedulingDecision(
std::span<const uint8_t> buffer);

Expand Down
4 changes: 2 additions & 2 deletions src/batch-scheduler/SchedulingDecision.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ std::set<std::string> SchedulingDecision::uniqueHosts()

void SchedulingDecision::print(const std::string& logLevel)
{
std::string printedText;
std::string printedText = "Printing scheduling decision:";
printedText += fmt::format(
"-------------- Decision for App: {} ----------------\n", appId);
"\n-------------- Decision for App: {} ----------------\n", appId);
printedText += "MsgId\tAppId\tGroupId\tGrIdx\tHostIp\n";
// Modulo a big number so that we can get the UUIDs to fit within one tab
int formatBase = 1e6;
Expand Down
78 changes: 77 additions & 1 deletion src/planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ bool Planner::reset()
{
SPDLOG_INFO("Resetting planner");

flushSchedulingState();

flushHosts();

return true;
Expand All @@ -115,6 +117,10 @@ bool Planner::flush(faabric::planner::FlushType flushType)
SPDLOG_INFO("Planner flushing executors");
flushExecutors();
return true;
case faabric::planner::FlushType::SchedulingState:
SPDLOG_INFO("Planner flushing scheduling state");
flushSchedulingState();
return true;
default:
SPDLOG_ERROR("Unrecognised flush type");
return false;
Expand All @@ -138,6 +144,15 @@ void Planner::flushExecutors()
}
}

void Planner::flushSchedulingState()
{
faabric::util::FullLock lock(plannerMx);

state.inFlightReqs.clear();
state.appResults.clear();
state.appResultWaiters.clear();
}

std::vector<std::shared_ptr<Host>> Planner::getAvailableHosts()
{
SPDLOG_DEBUG("Planner received request to get available hosts");
Expand Down Expand Up @@ -276,7 +291,52 @@ void Planner::setMessageResult(std::shared_ptr<faabric::Message> msg)
// Set the result
state.appResults[appId][msgId] = msg;

// Dispatch an async message to all hosts that are waiting
// Remove the message from the in-flight requests
if (!state.inFlightReqs.contains(appId)) {
// We don't want to error if any client uses `setMessageResult`
// liberally. This means that it may happen that when we set a message
// result a second time, the app is already not in-flight
SPDLOG_DEBUG("Setting result for non-existant (or finished) app: {}",
appId);
} else {
auto req = state.inFlightReqs.at(appId).first;
auto decision = state.inFlightReqs.at(appId).second;

// Work out the message position in the BER
auto it = std::find_if(
req->messages().begin(), req->messages().end(), [&](auto innerMsg) {
return innerMsg.id() == msg->id();
});
if (it == req->messages().end()) {
// Ditto as before. We want to allow setting the message result
// more than once without breaking
SPDLOG_DEBUG(
"Setting result for non-existant (or finished) message: {}",
appId);
} else {
SPDLOG_DEBUG("Removing message {} from app {}", msg->id(), appId);

// Remove message from in-flight requests
req->mutable_messages()->erase(it);

// Remove message from decision
decision->removeMessage(msg->id());

// Remove pair altogether if no more messages left
if (req->messages_size() == 0) {
SPDLOG_INFO("Planner removing app {} from in-flight", appId);
assert(decision->nFunctions == 0);
assert(decision->hosts.empty());
assert(decision->messageIds.empty());
assert(decision->appIdxs.empty());
assert(decision->groupIdxs.empty());
state.inFlightReqs.erase(appId);
}
}
}

// Finally, dispatch an async message to all hosts that are waiting once
// all planner accounting is updated
if (state.appResultWaiters.find(msgId) != state.appResultWaiters.end()) {
for (const auto& host : state.appResultWaiters[msgId]) {
SPDLOG_DEBUG("Sending result to waiting host: {}", host);
Expand Down Expand Up @@ -370,6 +430,22 @@ Planner::getSchedulingDecision(std::shared_ptr<BatchExecuteRequest> req)
return state.inFlightReqs.at(appId).second;
}

faabric::batch_scheduler::InFlightReqs Planner::getInFlightReqs()
{
faabric::util::SharedLock lock(plannerMx);

// Deliberately deep copy here
faabric::batch_scheduler::InFlightReqs inFlightReqsCopy;
for (const auto& [appId, inFlightPair] : state.inFlightReqs) {
inFlightReqsCopy[appId] = std::make_pair(
std::make_shared<BatchExecuteRequest>(*inFlightPair.first),
std::make_shared<faabric::batch_scheduler::SchedulingDecision>(
*inFlightPair.second));
}

return inFlightReqsCopy;
}

static faabric::batch_scheduler::HostMap convertToBatchSchedHostMap(
std::map<std::string, std::shared_ptr<Host>> hostMapIn)
{
Expand Down
12 changes: 12 additions & 0 deletions src/planner/PlannerClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,18 @@ faabric::Message PlannerClient::doGetMessageResult(
}
}

// This method is deliberately non-blocking, and returns all results tracked
// for a particular app
std::shared_ptr<faabric::BatchExecuteRequestStatus>
PlannerClient::getBatchResults(
std::shared_ptr<faabric::BatchExecuteRequest> req)
{
faabric::BatchExecuteRequestStatus berStatus;
syncSend(PlannerCalls::GetBatchResults, req.get(), &berStatus);

return std::make_shared<BatchExecuteRequestStatus>(berStatus);
}

faabric::batch_scheduler::SchedulingDecision PlannerClient::callFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req)
{
Expand Down
37 changes: 37 additions & 0 deletions src/planner/PlannerEndpointHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,10 @@ void PlannerEndpointHandler::onRequest(
}
case faabric::planner::HttpMessage_Type_FLUSH_AVAILABLE_HOSTS: {
SPDLOG_DEBUG("Planner received FLUSH_AVAILABLE_HOSTS request");

bool success = faabric::planner::getPlanner().flush(
faabric::planner::FlushType::Hosts);

if (success) {
response.result(beast::http::status::ok);
response.body() = std::string("Flushed available hosts!");
Expand All @@ -75,6 +77,7 @@ void PlannerEndpointHandler::onRequest(
response.body() =
std::string("Failed flushing available hosts!");
}

return ctx.sendFunction(std::move(response));
}
case faabric::planner::HttpMessage_Type_FLUSH_EXECUTORS: {
Expand All @@ -90,6 +93,17 @@ void PlannerEndpointHandler::onRequest(
}
return ctx.sendFunction(std::move(response));
}
case faabric::planner::HttpMessage_Type_FLUSH_SCHEDULING_STATE: {
SPDLOG_DEBUG("Planner received FLUSH_SCHEDULING_STATE request");

faabric::planner::getPlanner().flush(
faabric::planner::FlushType::SchedulingState);

response.result(beast::http::status::ok);
response.body() = std::string("Flushed scheduling state!");

return ctx.sendFunction(std::move(response));
}
case faabric::planner::HttpMessage_Type_GET_AVAILABLE_HOSTS: {
SPDLOG_DEBUG("Planner received GET_AVAILABLE_HOSTS request");

Expand Down Expand Up @@ -153,6 +167,29 @@ void PlannerEndpointHandler::onRequest(
}
return ctx.sendFunction(std::move(response));
}
case faabric::planner::HttpMessage_Type_GET_IN_FLIGHT_APPS: {
SPDLOG_DEBUG("Planner received GET_IN_FLIGHT_APPS request");

// Get in-flight apps
auto inFlightApps =
faabric::planner::getPlanner().getInFlightReqs();

// Prepare response
faabric::planner::GetInFlightAppsResponse inFlightAppsResponse;
for (const auto& [appId, inFlightPair] : inFlightApps) {
auto decision = inFlightPair.second;
auto* inFlightAppResp = inFlightAppsResponse.add_apps();
inFlightAppResp->set_appid(appId);
for (const auto& hostIp : decision->hosts) {
inFlightAppResp->add_hostips(hostIp);
}
}

response.result(beast::http::status::ok);
response.body() =
faabric::util::messageToJson(inFlightAppsResponse);
return ctx.sendFunction(std::move(response));
}
case faabric::planner::HttpMessage_Type_EXECUTE_BATCH: {
// in: BatchExecuteRequest
// out: BatchExecuteRequestStatus
Expand Down
18 changes: 18 additions & 0 deletions src/planner/PlannerServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ std::unique_ptr<google::protobuf::Message> PlannerServer::doSyncRecv(
case PlannerCalls::GetMessageResult: {
return recvGetMessageResult(message.udata());
}
case PlannerCalls::GetBatchResults: {
return recvGetBatchResults(message.udata());
}
case PlannerCalls::GetSchedulingDecision: {
return recvGetSchedulingDecision(message.udata());
}
Expand Down Expand Up @@ -153,6 +156,21 @@ std::unique_ptr<google::protobuf::Message> PlannerServer::recvGetMessageResult(
return std::make_unique<faabric::Message>(*resultMsg);
}

std::unique_ptr<google::protobuf::Message> PlannerServer::recvGetBatchResults(
std::span<const uint8_t> buffer)
{
PARSE_MSG(BatchExecuteRequest, buffer.data(), buffer.size());
auto req = std::make_shared<faabric::BatchExecuteRequest>(parsedMsg);

auto berStatus = planner.getBatchResults(req->appid());

if (berStatus == nullptr) {
return std::make_unique<BatchExecuteRequestStatus>();
}

return std::make_unique<BatchExecuteRequestStatus>(*berStatus);
}

std::unique_ptr<google::protobuf::Message>
PlannerServer::recvGetSchedulingDecision(std::span<const uint8_t> buffer)
{
Expand Down
Loading

0 comments on commit e98e917

Please sign in to comment.