From 5878ad1e120f2bef0b404f1ae065621afc0b4458 Mon Sep 17 00:00:00 2001
From: Carlos Segarra <carlos@carlossegarra.com>
Date: Mon, 18 Mar 2024 10:58:14 +0000
Subject: [PATCH] threads: elastically scale-up

---
 src/planner/Planner.cpp | 56 ++++++++++++++++++++++++++++++++++++++++-
 src/proto/faabric.proto |  3 +++
 2 files changed, 58 insertions(+), 1 deletion(-)

diff --git a/src/planner/Planner.cpp b/src/planner/Planner.cpp
index 1eeff755f..c5b106cec 100644
--- a/src/planner/Planner.cpp
+++ b/src/planner/Planner.cpp
@@ -27,6 +27,14 @@ namespace faabric::planner {
 // Utility Functions
 // ----------------------
 
+static int availableSlots(std::shared_ptr<Host> host)
+{
+    int availableSlots = host->slots() - host->usedslots();
+    assert(availableSlots >= 0);
+
+    return availableSlots;
+}
+
 static void claimHostSlots(std::shared_ptr<Host> host, int slotsToClaim = 1)
 {
     host->set_usedslots(host->usedslots() + slotsToClaim);
@@ -769,8 +777,46 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
     // does not modify it
     auto hostMapCopy =
       convertToBatchSchedHostMap(state.hostMap, state.nextEvictedHostIps);
+    bool isScaleChange =
+      decisionType == faabric::batch_scheduler::DecisionType::SCALE_CHANGE;
     bool isDistChange =
       decisionType == faabric::batch_scheduler::DecisionType::DIST_CHANGE;
+    bool existsPreloadedDec = state.preloadedSchedulingDecisions.contains(appId);
+
+    // For a SCALE_CHANGE decision (i.e. fork) with the elastic flag set, we
+    // want to scale up to as many available cores as possible in the app's
+    // main host (bypass this logic if we have pre-loaded a decision)
+    if (isScaleChange && req->elasticscalehint() && !existsPreloadedDec) {
+        SPDLOG_INFO("App {} requested to elastically scale-up", appId);
+        auto oldDec = state.inFlightReqs.at(appId).second;
+        auto mainHost = oldDec->hosts.at(0);
+
+        int numAvail = availableSlots(state.hostMap.at(mainHost));
+        int numRequested = req->messages_size();
+        int lastMsgIdx = req->messages(numRequested - 1).groupidx();
+        for (int itr  = 0; itr < (numAvail - numRequested); itr++) {
+            // Differentiate between the position in the message array (itr)
+            // and the new group index. Usually, in a fork, they would be
+            // offset by one
+            int msgIdx = lastMsgIdx + itr + 1;
+            SPDLOG_DEBUG("Adding elastically scaled up msg idx {} (app: {})", msgIdx, appId);
+
+            // To add a new message, copy from the last, and update the indexes
+            *req->add_messages() = req->messages(numRequested - 1);
+            req->mutable_messages(numRequested + itr)->set_appidx(msgIdx);
+            req->mutable_messages(numRequested + itr)->set_groupidx(msgIdx);
+
+            // Also update the message id to make sure we can wait-for and
+            // clean-up the resources we use
+            req->mutable_messages(numRequested + itr)->set_id(faabric::util::generateGid());
+        }
+
+        if (numAvail > numRequested) {
+            SPDLOG_INFO("Elastically scaled-up app {} ({} -> {})", appId, numRequested, numAvail);
+        } else {
+            SPDLOG_INFO("Decided NOT to elastically scaled-up app {}", appId);
+        }
+    }
 
     // For a DIST_CHANGE decision (i.e. migration) we want to try to imrpove
     // on the old decision (we don't care the one we send), so we make sure
@@ -797,8 +843,14 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
     // (e.g. if we want to force a migration). Note that we don't want to check
     // pre-loaded decisions for dist-change requests
     std::shared_ptr<batch_scheduler::SchedulingDecision> decision = nullptr;
-    if (!isDistChange && state.preloadedSchedulingDecisions.contains(appId)) {
+    if (!isDistChange && existsPreloadedDec) {
         decision = getPreloadedSchedulingDecision(appId, req);
+
+        // In general, after a scale change decision (that has been preloaded)
+        // it is safe to remove it
+        if (isScaleChange) {
+            state.preloadedSchedulingDecisions.erase(appId);
+        }
     } else if (isNew && isMpi) {
         mpiReq = std::make_shared<BatchExecuteRequest>();
         *mpiReq = *req;
@@ -1139,6 +1191,8 @@ void Planner::dispatchSchedulingDecision(
             hostRequests[thisHost]->set_singlehost(isSingleHost);
             // Propagate the single host hint
             hostRequests[thisHost]->set_singlehosthint(req->singlehosthint());
+            // Propagate the elastic scaling hint
+            hostRequests[thisHost]->set_elasticscalehint(req->elasticscalehint());
         }
 
         *hostRequests[thisHost]->add_messages() = msg;
diff --git a/src/proto/faabric.proto b/src/proto/faabric.proto
index 490e1cca2..56abf85dd 100644
--- a/src/proto/faabric.proto
+++ b/src/proto/faabric.proto
@@ -54,6 +54,9 @@ message BatchExecuteRequest {
     // Hint set by the user to hint that this execution should all be in a
     // single host
     bool singleHostHint = 11;
+
+    // Hint set by the user to make scale-up requests elastic
+    bool elasticScaleHint = 12;
 }
 
 message BatchExecuteRequestStatus {