From ddba013f726c92946f824aa8479ad7d418244c79 Mon Sep 17 00:00:00 2001
From: Carlos <carlos@carlossegarra.com>
Date: Sun, 5 May 2024 16:16:23 -0400
Subject: [PATCH] planner: introduce SPOT VMs policy (#433)

---
 .env                                          |   4 +-
 .github/workflows/tests.yml                   |  12 +-
 VERSION                                       |   2 +-
 .../faabric/batch-scheduler/BatchScheduler.h  |   5 +
 .../faabric/batch-scheduler/SpotScheduler.h   |  33 ++
 include/faabric/planner/Planner.h             |  13 +
 include/faabric/planner/PlannerState.h        |  18 +
 include/faabric/util/func.h                   |   9 +
 src/batch-scheduler/BatchScheduler.cpp        |   3 +
 src/batch-scheduler/CMakeLists.txt            |   1 +
 src/batch-scheduler/CompactScheduler.cpp      |   8 +-
 src/batch-scheduler/SpotScheduler.cpp         | 331 +++++++++++++
 src/executor/Executor.cpp                     |  29 +-
 src/planner/Planner.cpp                       | 291 ++++++++++-
 src/planner/PlannerEndpointHandler.cpp        |  67 +++
 src/planner/planner.proto                     |  21 +-
 src/scheduler/Scheduler.cpp                   |  33 +-
 tests/dist/mpi/mpi_native.cpp                 |  37 +-
 tests/dist/mpi/test_multiple_mpi_worlds.cpp   | 164 +++++++
 .../batch-scheduler/test_spot_scheduler.cpp   | 450 ++++++++++++++++++
 tests/test/planner/test_planner_endpoint.cpp  |  80 ++++
 tests/test/scheduler/test_scheduler.cpp       |   2 +-
 .../test_message_endpoint_client.cpp          |  15 +-
 tests/utils/faabric_utils.h                   |   5 +
 tests/utils/fixtures.h                        |   6 +
 tests/utils/planner_utils.cpp                 |  35 ++
 26 files changed, 1618 insertions(+), 56 deletions(-)
 create mode 100644 include/faabric/batch-scheduler/SpotScheduler.h
 create mode 100644 src/batch-scheduler/SpotScheduler.cpp
 create mode 100644 tests/test/batch-scheduler/test_spot_scheduler.cpp

diff --git a/.env b/.env
index a17db033a..a379ae624 100644
--- a/.env
+++ b/.env
@@ -1,4 +1,4 @@
-FAABRIC_VERSION=0.18.0
-FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.18.0
+FAABRIC_VERSION=0.19.0
+FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.19.0
 COMPOSE_PROJECT_NAME=faabric-dev
 CONAN_CACHE_MOUNT_SOURCE=./conan-cache/
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index ecac5e483..cb924bb6b 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -20,7 +20,7 @@ jobs:
     if: github.event.pull_request.draft == false
     runs-on: ubuntu-latest
     container:
-      image: faasm.azurecr.io/faabric:0.18.0
+      image: faasm.azurecr.io/faabric:0.19.0
     env:
       DEPLOYMENT_TYPE: gha-ci
     steps:
@@ -34,7 +34,7 @@ jobs:
     if: github.event.pull_request.draft == false
     runs-on: ubuntu-latest
     container:
-      image: faasm.azurecr.io/faabric:0.18.0
+      image: faasm.azurecr.io/faabric:0.19.0
     steps:
       - name: "Check out code"
         uses: actions/checkout@v4
@@ -45,7 +45,7 @@ jobs:
     if: github.event.pull_request.draft == false
     runs-on: ubuntu-latest
     container:
-      image: faasm.azurecr.io/faabric:0.18.0
+      image: faasm.azurecr.io/faabric:0.19.0
     steps:
       - name: "Check out code"
         uses: actions/checkout@v4
@@ -65,7 +65,7 @@ jobs:
       REDIS_QUEUE_HOST: redis
       REDIS_STATE_HOST: redis
     container:
-      image: faasm.azurecr.io/faabric:0.18.0
+      image: faasm.azurecr.io/faabric:0.19.0
       options: --privileged
     services:
       redis:
@@ -104,7 +104,7 @@ jobs:
       REDIS_QUEUE_HOST: redis
       REDIS_STATE_HOST: redis
     container:
-      image: faasm.azurecr.io/faabric:0.18.0
+      image: faasm.azurecr.io/faabric:0.19.0
       options: --privileged
     services:
       redis:
@@ -156,7 +156,7 @@ jobs:
       REDIS_QUEUE_HOST: redis
       REDIS_STATE_HOST: redis
     container:
-      image: faasm.azurecr.io/faabric:0.18.0
+      image: faasm.azurecr.io/faabric:0.19.0
     services:
       redis:
         image: redis
diff --git a/VERSION b/VERSION
index 66333910a..1cf0537c3 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.18.0
+0.19.0
diff --git a/include/faabric/batch-scheduler/BatchScheduler.h b/include/faabric/batch-scheduler/BatchScheduler.h
index fd9974e7c..be3ceef02 100644
--- a/include/faabric/batch-scheduler/BatchScheduler.h
+++ b/include/faabric/batch-scheduler/BatchScheduler.h
@@ -12,6 +12,11 @@
 #define NOT_ENOUGH_SLOTS_DECISION                                              \
     faabric::batch_scheduler::SchedulingDecision(NOT_ENOUGH_SLOTS,             \
                                                  NOT_ENOUGH_SLOTS)
+#define MUST_FREEZE -97
+#define MUST_FREEZE_DECISION                                                   \
+    faabric::batch_scheduler::SchedulingDecision(MUST_FREEZE, MUST_FREEZE)
+
+#define MUST_EVICT_IP "E.VI.CT.ME"
 
 namespace faabric::batch_scheduler {
 
diff --git a/include/faabric/batch-scheduler/SpotScheduler.h b/include/faabric/batch-scheduler/SpotScheduler.h
new file mode 100644
index 000000000..ff4db3e92
--- /dev/null
+++ b/include/faabric/batch-scheduler/SpotScheduler.h
@@ -0,0 +1,33 @@
+#pragma once
+
+#include <faabric/batch-scheduler/BatchScheduler.h>
+#include <faabric/batch-scheduler/SchedulingDecision.h>
+#include <faabric/util/batch.h>
+
+namespace faabric::batch_scheduler {
+
+// This batch scheduler behaves in the same way than BinPack for NEW and
+// SCALE_CHANGE requests, but for DIST_CHANGE it considers if any of the
+// hosts in the Host Map have been tainted with the eviction mark. In which
+// case it first tries to migrate them to other running hosts and, if not
+// enough hosts are available, freezes the messages.
+class SpotScheduler final : public BatchScheduler
+{
+  public:
+    std::shared_ptr<SchedulingDecision> makeSchedulingDecision(
+      HostMap& hostMap,
+      const InFlightReqs& inFlightReqs,
+      std::shared_ptr<faabric::BatchExecuteRequest> req) override;
+
+  private:
+    bool isFirstDecisionBetter(
+      std::shared_ptr<SchedulingDecision> decisionA,
+      std::shared_ptr<SchedulingDecision> decisionB) override;
+
+    std::vector<Host> getSortedHosts(
+      HostMap& hostMap,
+      const InFlightReqs& inFlightReqs,
+      std::shared_ptr<faabric::BatchExecuteRequest> req,
+      const DecisionType& decisionType) override;
+};
+}
diff --git a/include/faabric/planner/Planner.h b/include/faabric/planner/Planner.h
index 0c28aeeee..3b9707fe2 100644
--- a/include/faabric/planner/Planner.h
+++ b/include/faabric/planner/Planner.h
@@ -33,6 +33,8 @@ class Planner
 
     void printConfig() const;
 
+    std::string getPolicy();
+
     void setPolicy(const std::string& newPolicy);
 
     // ----------
@@ -87,10 +89,21 @@ class Planner
     // the planner was last reset
     int getNumMigrations();
 
+    // Helper method to get the next host that will be evicted
+    std::set<std::string> getNextEvictedHostIps();
+
+    std::map<int32_t, std::shared_ptr<BatchExecuteRequest>> getEvictedReqs();
+
     // Main entrypoint to request the execution of batches
     std::shared_ptr<faabric::batch_scheduler::SchedulingDecision> callBatch(
       std::shared_ptr<BatchExecuteRequest> req);
 
+    // ----------
+    // API exclusive to SPOT policy mode
+    // ----------
+
+    void setNextEvictedVm(const std::set<std::string>& vmIp);
+
   private:
     // There's a singleton instance of the planner running, but it must allow
     // concurrent requests
diff --git a/include/faabric/planner/PlannerState.h b/include/faabric/planner/PlannerState.h
index 92f77006a..5a5a73017 100644
--- a/include/faabric/planner/PlannerState.h
+++ b/include/faabric/planner/PlannerState.h
@@ -12,6 +12,10 @@ namespace faabric::planner {
  */
 struct PlannerState
 {
+    // Policy to operate the planner in. Mostly determins the batch scheduler
+    // behaviour, but also the planner's in some cases
+    std::string policy;
+
     // Accounting of the hosts that are registered in the system and responsive
     // We deliberately use the host's IP as unique key, but assign a unique host
     // id for redundancy
@@ -36,5 +40,19 @@ struct PlannerState
 
     // Helper coutner of the total number of migrations
     std::atomic<int> numMigrations = 0;
+
+    // -----
+    // Data structures used only under the SPOT policy
+    // -----
+
+    // Map containing the BER that have been evicted due to a SPOT VM eviction.
+    // All messages in the VM have been checkpointed, are in the snapshot
+    // registry in the planner, and are ready to be scheduled when capacity
+    // appears
+    std::map<int, std::shared_ptr<BatchExecuteRequest>> evictedRequests;
+
+    // This variable simulates the values we would get from a cloud provider's
+    // API indicating the (set of) VM to be evicted next
+    std::set<std::string> nextEvictedHostIps;
 };
 }
diff --git a/include/faabric/util/func.h b/include/faabric/util/func.h
index e8ec5b93e..35aa1ef1b 100644
--- a/include/faabric/util/func.h
+++ b/include/faabric/util/func.h
@@ -6,9 +6,18 @@
 #include <vector>
 
 #define MIGRATED_FUNCTION_RETURN_VALUE -99
+#define FROZEN_FUNCTION_RETURN_VALUE -98
 
 namespace faabric::util {
 
+class FunctionFrozenException : public faabric::util::FaabricException
+{
+  public:
+    explicit FunctionFrozenException(std::string message)
+      : FaabricException(std::move(message))
+    {}
+};
+
 class FunctionMigratedException : public faabric::util::FaabricException
 {
   public:
diff --git a/src/batch-scheduler/BatchScheduler.cpp b/src/batch-scheduler/BatchScheduler.cpp
index 58dc27d35..416a595d4 100644
--- a/src/batch-scheduler/BatchScheduler.cpp
+++ b/src/batch-scheduler/BatchScheduler.cpp
@@ -1,6 +1,7 @@
 #include <faabric/batch-scheduler/BatchScheduler.h>
 #include <faabric/batch-scheduler/BinPackScheduler.h>
 #include <faabric/batch-scheduler/CompactScheduler.h>
+#include <faabric/batch-scheduler/SpotScheduler.h>
 #include <faabric/util/config.h>
 #include <faabric/util/logging.h>
 
@@ -23,6 +24,8 @@ std::shared_ptr<BatchScheduler> getBatchScheduler()
         batchScheduler = std::make_shared<BinPackScheduler>();
     } else if (mode == "compact") {
         batchScheduler = std::make_shared<CompactScheduler>();
+    } else if (mode == "spot") {
+        batchScheduler = std::make_shared<SpotScheduler>();
     } else {
         SPDLOG_ERROR("Unrecognised batch scheduler mode: {}", mode);
         throw std::runtime_error("Unrecognised batch scheduler mode");
diff --git a/src/batch-scheduler/CMakeLists.txt b/src/batch-scheduler/CMakeLists.txt
index 7a73ddcfa..79ebdd789 100644
--- a/src/batch-scheduler/CMakeLists.txt
+++ b/src/batch-scheduler/CMakeLists.txt
@@ -7,6 +7,7 @@ faabric_lib(batch_scheduler
     BatchScheduler.cpp
     BinPackScheduler.cpp
     CompactScheduler.cpp
+    SpotScheduler.cpp
 )
 
 target_link_libraries(batch_scheduler PRIVATE
diff --git a/src/batch-scheduler/CompactScheduler.cpp b/src/batch-scheduler/CompactScheduler.cpp
index f37270dc4..b623afe18 100644
--- a/src/batch-scheduler/CompactScheduler.cpp
+++ b/src/batch-scheduler/CompactScheduler.cpp
@@ -98,7 +98,7 @@ bool CompactScheduler::isFirstDecisionBetter(
     throw std::runtime_error("Method not supported for COMPACT scheduler");
 }
 
-HostMap deepCopyHostMap(const HostMap& hostMap)
+static HostMap deepCopyHostMap(const HostMap& hostMap)
 {
     HostMap newHostMap;
 
@@ -173,9 +173,9 @@ bool CompactScheduler::isFirstDecisionBetter(
 
 // Filter-out from the host map all nodes that are executing requests from a
 // different user
-void filterHosts(HostMap& hostMap,
-                 const InFlightReqs& inFlightReqs,
-                 std::shared_ptr<faabric::BatchExecuteRequest> req)
+static void filterHosts(HostMap& hostMap,
+                        const InFlightReqs& inFlightReqs,
+                        std::shared_ptr<faabric::BatchExecuteRequest> req)
 {
     // We temporarily use the request subtype field to attach a user id for our
     // multi-tenant simulations
diff --git a/src/batch-scheduler/SpotScheduler.cpp b/src/batch-scheduler/SpotScheduler.cpp
new file mode 100644
index 000000000..dd1922ebd
--- /dev/null
+++ b/src/batch-scheduler/SpotScheduler.cpp
@@ -0,0 +1,331 @@
+#include <faabric/batch-scheduler/SchedulingDecision.h>
+#include <faabric/batch-scheduler/SpotScheduler.h>
+#include <faabric/util/batch.h>
+#include <faabric/util/logging.h>
+
+namespace faabric::batch_scheduler {
+
+static std::map<std::string, int> getHostFreqCount(
+  std::shared_ptr<SchedulingDecision> decision)
+{
+    std::map<std::string, int> hostFreqCount;
+    for (auto host : decision->hosts) {
+        hostFreqCount[host] += 1;
+    }
+
+    return hostFreqCount;
+}
+
+// Given a new decision that improves on an old decision (i.e. to migrate), we
+// want to make sure that we minimise the number of migration requests we send.
+// This is, we want to keep as many host-message scheduling in the old decision
+// as possible, and also have the overall locality of the new decision (i.e.
+// the host-message histogram)
+// NOTE: keep in mind that the newDecision has the right host histogram, but
+// the messages may be completely out-of-order
+static std::shared_ptr<SchedulingDecision> minimiseNumOfMigrations(
+  std::shared_ptr<SchedulingDecision> newDecision,
+  std::shared_ptr<SchedulingDecision> oldDecision)
+{
+    auto decision = std::make_shared<SchedulingDecision>(oldDecision->appId,
+                                                         oldDecision->groupId);
+
+    // We want to maintain the new decision's host-message histogram
+    auto hostFreqCount = getHostFreqCount(newDecision);
+
+    // Helper function to find the next host in the histogram with slots
+    auto nextHostWithSlots = [&hostFreqCount]() -> std::string {
+        for (auto [ip, slots] : hostFreqCount) {
+            if (slots > 0) {
+                return ip;
+            }
+        }
+
+        // Unreachable (in this context)
+        throw std::runtime_error("No next host with slots found!");
+    };
+
+    assert(newDecision->hosts.size() == oldDecision->hosts.size());
+
+    // First we try to allocate to each message the same host they used to have
+    for (int i = 0; i < oldDecision->hosts.size(); i++) {
+        auto oldHost = oldDecision->hosts.at(i);
+
+        if (hostFreqCount.contains(oldHost) && hostFreqCount.at(oldHost) > 0) {
+            decision->addMessageInPosition(i,
+                                           oldHost,
+                                           oldDecision->messageIds.at(i),
+                                           oldDecision->appIdxs.at(i),
+                                           oldDecision->groupIdxs.at(i),
+                                           oldDecision->mpiPorts.at(i));
+
+            hostFreqCount.at(oldHost) -= 1;
+        }
+    }
+
+    // Second we allocate the rest
+    for (int i = 0; i < oldDecision->hosts.size(); i++) {
+        if (decision->nFunctions <= i || decision->hosts.at(i).empty()) {
+
+            auto nextHost = nextHostWithSlots();
+            decision->addMessageInPosition(i,
+                                           nextHost,
+                                           oldDecision->messageIds.at(i),
+                                           oldDecision->appIdxs.at(i),
+                                           oldDecision->groupIdxs.at(i),
+                                           -1);
+
+            hostFreqCount.at(nextHost) -= 1;
+        }
+    }
+
+    // Assert that we have preserved the new decision's host-message histogram
+    // (use the pre-processor macro as we assert repeatedly in the loop, so we
+    // want to avoid having an empty loop in non-debug mode)
+#ifndef NDEBUG
+    for (auto [host, freq] : hostFreqCount) {
+        assert(freq == 0);
+    }
+#endif
+
+    return decision;
+}
+
+bool SpotScheduler::isFirstDecisionBetter(
+  std::shared_ptr<SchedulingDecision> decisionA,
+  std::shared_ptr<SchedulingDecision> decisionB)
+{
+    throw std::runtime_error("Method not supported for COMPACT scheduler");
+}
+
+// Filter-out from the host map the next VM that will be evicted
+static std::set<std::string> filterHosts(HostMap& hostMap)
+{
+    std::set<std::string> ipsToRemove;
+
+    for (const auto& [hostIp, host] : hostMap) {
+        if (host->ip == MUST_EVICT_IP) {
+            ipsToRemove.insert(hostIp);
+        }
+    }
+
+    for (const auto& ipToRemove : ipsToRemove) {
+        hostMap.erase(ipToRemove);
+    }
+
+    return ipsToRemove;
+}
+
+std::vector<Host> SpotScheduler::getSortedHosts(
+  HostMap& hostMap,
+  const InFlightReqs& inFlightReqs,
+  std::shared_ptr<faabric::BatchExecuteRequest> req,
+  const DecisionType& decisionType)
+{
+    std::vector<Host> sortedHosts;
+    for (auto [ip, host] : hostMap) {
+        sortedHosts.push_back(host);
+    }
+
+    std::shared_ptr<SchedulingDecision> oldDecision = nullptr;
+    std::map<std::string, int> hostFreqCount;
+    if (decisionType != DecisionType::NEW) {
+        oldDecision = inFlightReqs.at(req->appid()).second;
+        hostFreqCount = getHostFreqCount(oldDecision);
+    }
+
+    auto isFirstHostLarger = [&](const Host& hostA, const Host& hostB) -> bool {
+        // The SPOT scheduler sorts hosts by number of available slots
+        int nAvailableA = numSlotsAvailable(hostA);
+        int nAvailableB = numSlotsAvailable(hostB);
+        if (nAvailableA != nAvailableB) {
+            return nAvailableA > nAvailableB;
+        }
+
+        // In case of a tie, it will pick larger hosts first
+        int nSlotsA = numSlots(hostA);
+        int nSlotsB = numSlots(hostB);
+        if (nSlotsA != nSlotsB) {
+            return nSlotsA > nSlotsB;
+        }
+
+        // Lastly, in case of a tie, return the largest host alphabetically
+        return getIp(hostA) > getIp(hostB);
+    };
+
+    auto isFirstHostLargerWithFreq = [&](auto hostA, auto hostB) -> bool {
+        // When updating an existing scheduling decision (SCALE_CHANGE or
+        // DIST_CHANGE), the SPOT scheduler takes into consideration the
+        // existing host-message histogram (i.e. how many messages for this app
+        // does each host _already_ run). This behaviour is the same than the
+        // BIN_PACK and COMPACT policies
+
+        int numInHostA = hostFreqCount.contains(getIp(hostA))
+                           ? hostFreqCount.at(getIp(hostA))
+                           : 0;
+        int numInHostB = hostFreqCount.contains(getIp(hostB))
+                           ? hostFreqCount.at(getIp(hostB))
+                           : 0;
+
+        // If at least one of the hosts has messages for this request, return
+        // the host with the more messages for this request (note that it is
+        // possible that this host has no available slots at all, in this case
+        // we will just pack 0 messages here but we still want to sort it first
+        // nontheless)
+        if (numInHostA != numInHostB) {
+            return numInHostA > numInHostB;
+        }
+
+        // In case of a tie, use the same criteria than NEW
+        return isFirstHostLarger(hostA, hostB);
+    };
+
+    switch (decisionType) {
+        case DecisionType::NEW: {
+            // For a NEW decision type, the SPOT scheduler just sorts the
+            // hosts in decreasing order of capacity, and bin-packs messages
+            // to hosts in this order. This has one caveat that it skips the
+            // next VM that we know will be evicted
+            std::sort(
+              sortedHosts.begin(), sortedHosts.end(), isFirstHostLarger);
+
+            break;
+        }
+        case DecisionType::SCALE_CHANGE: {
+            // If we are changing the scale of a running app (i.e. via chaining
+            // or thread/process forking) we want to prioritise co-locating
+            // as much as possible. This means that we will sort first by the
+            // frequency of messages of the running app, and second with the
+            // same criteria than NEW
+            // IMPORTANT: a SCALE_CHANGE request with 4 messages means that we
+            // want to add 4 NEW messages to the running app (not that the new
+            // total count is 4)
+            std::sort(sortedHosts.begin(),
+                      sortedHosts.end(),
+                      isFirstHostLargerWithFreq);
+
+            break;
+        }
+        case DecisionType::DIST_CHANGE: {
+            // A DIST_CHANGE with the SPOT scheduler means that, if the app
+            // is running any messages of the to-be-evicted VM, we must move
+            // them from there. Two things may happen:
+            // * We have slots to move them-to (equivalent to re-scheduling
+            //   from scratch without the tainted VM)
+            // * We do not have slots to move them-to, in which case all
+            //   messages need to freeze until there is capacity in the cluster
+            //   again
+
+            auto oldDecision = inFlightReqs.at(req->appid()).second;
+            auto hostFreqCount = getHostFreqCount(oldDecision);
+
+            // First remove the slots the app occupies to have a fresh new
+            // shot at the scheduling
+            for (auto host : sortedHosts) {
+                if (hostFreqCount.contains(getIp(host))) {
+                    freeSlots(host, hostFreqCount.at(getIp(host)));
+                }
+            }
+
+            // Try to schedule again without the tainted VM. Note that this
+            // app may not be using the tainted VM _at all_ in which case we
+            // will just discard the suggested migration.
+            std::sort(sortedHosts.begin(),
+                      sortedHosts.end(),
+                      isFirstHostLargerWithFreq);
+
+            break;
+        }
+        default: {
+            SPDLOG_ERROR("Unrecognised decision type: {}", decisionType);
+            throw std::runtime_error("Unrecognised decision type");
+        }
+    }
+
+    return sortedHosts;
+}
+
+// The BinPack's scheduler decision algorithm is very simple. It first sorts
+// hosts (i.e. bins) in a specific order (depending on the scheduling type),
+// and then starts filling bins from begining to end, until it runs out of
+// messages to schedule. The SPOT scheduler behaves as the BinPack for
+// NEW and SCALE_CHANGE requests, with two caveats:
+// - it avoids setting any messages to a host that is going to be evicted
+// - when migrating, it will check if the migration candidate has any messages
+//   running in the to-be-evicted VM. If so, it will try to migrate messages
+//   away from the evicted-to-VM. If it cannot, it will request the app to
+//   INTERRUPT
+std::shared_ptr<SchedulingDecision> SpotScheduler::makeSchedulingDecision(
+  HostMap& hostMap,
+  const InFlightReqs& inFlightReqs,
+  std::shared_ptr<BatchExecuteRequest> req)
+{
+    auto decision = std::make_shared<SchedulingDecision>(req->appid(), 0);
+
+    // Filter the hosts removing the VM that will be evicted next
+    std::set<std::string> evictedHostIps = filterHosts(hostMap);
+
+    // Get the sorted list of hosts
+    auto decisionType = getDecisionType(inFlightReqs, req);
+    auto sortedHosts = getSortedHosts(hostMap, inFlightReqs, req, decisionType);
+
+    // Assign slots from the list (i.e. bin-pack)
+    auto itr = sortedHosts.begin();
+    int numLeftToSchedule = req->messages_size();
+    int msgIdx = 0;
+    while (itr < sortedHosts.end()) {
+        // Calculate how many slots can we assign to this host (assign as many
+        // as possible)
+        int numOnThisHost =
+          std::min<int>(numLeftToSchedule, numSlotsAvailable(*itr));
+        for (int i = 0; i < numOnThisHost; i++) {
+            decision->addMessage(getIp(*itr), req->messages(msgIdx));
+            msgIdx++;
+        }
+
+        // Update the number of messages left to schedule
+        numLeftToSchedule -= numOnThisHost;
+
+        // If there are no more messages to schedule, we are done
+        if (numLeftToSchedule == 0) {
+            break;
+        }
+
+        // Otherwise, it means that we have exhausted this host, and need to
+        // check in the next one
+        itr++;
+    }
+
+    bool isDistChange = decisionType == DecisionType::DIST_CHANGE;
+
+    // If we still have enough slots to schedule, we are out of slots
+    if (numLeftToSchedule > 0 && !isDistChange) {
+        return std::make_shared<SchedulingDecision>(NOT_ENOUGH_SLOTS_DECISION);
+    }
+
+    if (isDistChange) {
+        // If we ran out of slots whilst processing a migration request it
+        // means that we have some messages running in the to-be-evicted VM
+        // and we can not migrate them elsewhere. In this case we must FREEZE
+        // all messages
+        if (numLeftToSchedule > 0) {
+            return std::make_shared<SchedulingDecision>(MUST_FREEZE_DECISION);
+        }
+
+        // Check if we are running any messages in the to-be evicted VM. Only
+        // migrate if we are
+        auto oldDecision = inFlightReqs.at(req->appid()).second;
+        for (const auto& hostIp : oldDecision->hosts) {
+            if (evictedHostIps.contains(hostIp)) {
+                // If we are requesting a migration, make sure that we minimise
+                // the number of messages to actuall migrate
+                return minimiseNumOfMigrations(decision, oldDecision);
+            }
+        }
+
+        return std::make_shared<SchedulingDecision>(DO_NOT_MIGRATE_DECISION);
+    }
+
+    return decision;
+}
+}
diff --git a/src/executor/Executor.cpp b/src/executor/Executor.cpp
index 32a91d776..c58f2093b 100644
--- a/src/executor/Executor.cpp
+++ b/src/executor/Executor.cpp
@@ -414,6 +414,32 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx)
                     }
                 }
             }
+        } catch (const faabric::util::FunctionFrozenException& ex) {
+            SPDLOG_DEBUG(
+              "Task {} frozen, shutting down executor {}", msg.id(), id);
+
+            returnValue = FROZEN_FUNCTION_RETURN_VALUE;
+
+            // TODO: maybe we do not need this here as we know this VM will be
+            // destroyed soon, but we do it nontheless just in case
+            if (msg.ismpi()) {
+                auto& mpiWorldRegistry = faabric::mpi::getMpiWorldRegistry();
+                if (mpiWorldRegistry.worldExists(msg.mpiworldid())) {
+                    bool mustClear =
+                      mpiWorldRegistry.getWorld(msg.mpiworldid()).destroy();
+
+                    if (mustClear) {
+                        SPDLOG_DEBUG("{}:{}:{} clearing world {} from host {}",
+                                     msg.appid(),
+                                     msg.groupid(),
+                                     msg.groupidx(),
+                                     msg.mpiworldid(),
+                                     msg.executedhost());
+
+                        mpiWorldRegistry.clearWorld(msg.mpiworldid());
+                    }
+                }
+            }
         } catch (const std::exception& ex) {
             returnValue = 1;
 
@@ -482,8 +508,7 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx)
         // main host we still have the zero-th thread executing)
         auto mainThreadSnapKey = faabric::util::getMainThreadSnapshotKey(msg);
         std::vector<faabric::util::SnapshotDiff> diffs;
-        // FIXME: thread 0 locally is not part of this batch, but is still
-        // in the same executor
+
         bool isRemoteThread =
           task.req->messages(0).mainhost() != conf.endpointHost;
         if (isLastThreadInBatch && doDirtyTracking && isRemoteThread) {
diff --git a/src/planner/Planner.cpp b/src/planner/Planner.cpp
index f2a4aba02..1eeff755f 100644
--- a/src/planner/Planner.cpp
+++ b/src/planner/Planner.cpp
@@ -132,11 +132,19 @@ void Planner::printConfig() const
     SPDLOG_INFO("HTTP_SERVER_THREADS        {}", config.numthreadshttpserver());
 }
 
+std::string Planner::getPolicy()
+{
+    faabric::util::SharedLock lock(plannerMx);
+
+    return state.policy;
+}
+
 void Planner::setPolicy(const std::string& newPolicy)
 {
     // Acquire lock to prevent any changes in state whilst we change the policy
     faabric::util::FullLock lock(plannerMx);
 
+    state.policy = newPolicy;
     faabric::batch_scheduler::resetBatchScheduler(newPolicy);
 }
 
@@ -193,10 +201,16 @@ void Planner::flushSchedulingState()
 {
     faabric::util::FullLock lock(plannerMx);
 
+    state.policy = "bin-pack";
+
     state.inFlightReqs.clear();
     state.appResults.clear();
     state.appResultWaiters.clear();
+
     state.numMigrations = 0;
+
+    state.evictedRequests.clear();
+    state.nextEvictedHostIps.clear();
 }
 
 std::vector<std::shared_ptr<Host>> Planner::getAvailableHosts()
@@ -349,14 +363,57 @@ void Planner::setMessageResult(std::shared_ptr<faabric::Message> msg)
                  msg->groupid(),
                  msg->groupidx());
 
+    // If we are setting the result for a frozen message, it is important
+    // that we store the message itself in the evicted BER as it contains
+    // information like the function pointer and snapshot key to eventually
+    // un-freeze from. In addition, we want to skip setting the message result
+    // as we will set it when the message finally succeeds
+    bool isFrozenMsg = msg->returnvalue() == FROZEN_FUNCTION_RETURN_VALUE;
+    if (isFrozenMsg) {
+        if (!state.evictedRequests.contains(msg->appid())) {
+            SPDLOG_ERROR("Message {} is frozen but app (id: {}) not in map!",
+                         msg->id(),
+                         msg->appid());
+            throw std::runtime_error("Orphaned frozen message!");
+        }
+
+        auto ber = state.evictedRequests.at(msg->appid());
+        bool found = false;
+        for (int i = 0; i < ber->messages_size(); i++) {
+            if (ber->messages(i).id() == msg->id()) {
+                SPDLOG_DEBUG("Setting message {} in the forzen BER for app {}",
+                             msg->id(),
+                             appId);
+
+                // Propagate the fields that we set during migration
+                ber->mutable_messages(i)->set_funcptr(msg->funcptr());
+                ber->mutable_messages(i)->set_inputdata(msg->inputdata());
+                ber->mutable_messages(i)->set_snapshotkey(msg->snapshotkey());
+                ber->mutable_messages(i)->set_returnvalue(msg->returnvalue());
+
+                found = true;
+                break;
+            }
+        }
+
+        if (!found) {
+            SPDLOG_ERROR(
+              "Error trying to set message {} in the frozen BER for app {}",
+              msg->id(),
+              appId);
+        }
+    }
+
     // Release the slot only once
     assert(state.hostMap.contains(msg->executedhost()));
-    if (!state.appResults[appId].contains(msgId)) {
+    if (!state.appResults[appId].contains(msgId) || isFrozenMsg) {
         releaseHostSlots(state.hostMap.at(msg->executedhost()));
     }
 
     // Set the result
-    state.appResults[appId][msgId] = msg;
+    if (!isFrozenMsg) {
+        state.appResults[appId][msgId] = msg;
+    }
 
     // Remove the message from the in-flight requests
     if (!state.inFlightReqs.contains(appId)) {
@@ -414,6 +471,12 @@ void Planner::setMessageResult(std::shared_ptr<faabric::Message> msg)
         }
     }
 
+    // When setting a frozen's message result, we can skip notifying waiting
+    // hosts
+    if (isFrozenMsg) {
+        return;
+    }
+
     // Finally, dispatch an async message to all hosts that are waiting once
     // all planner accounting is updated
     if (state.appResultWaiters.find(msgId) != state.appResultWaiters.end()) {
@@ -537,20 +600,77 @@ std::shared_ptr<faabric::BatchExecuteRequestStatus> Planner::getBatchResults(
 {
     auto berStatus = faabric::util::batchExecStatusFactory(appId);
 
+    // When querying for the result of a batch we always check if it has been
+    // evicted, as it is one of the triggers to try and re-schedule it again
+    bool isFrozen = false;
+    std::shared_ptr<BatchExecuteRequest> frozenBer = nullptr;
+
     // Acquire a read lock to copy all the results we have for this batch
     {
         faabric::util::SharedLock lock(plannerMx);
 
-        if (!state.appResults.contains(appId)) {
-            return nullptr;
+        if (state.evictedRequests.contains(appId)) {
+            isFrozen = true;
+
+            // To prevent race conditions, before treating an app as frozen
+            // we require all messages to have reported as frozen
+            for (const auto& msg :
+                 state.evictedRequests.at(appId)->messages()) {
+                if (msg.returnvalue() != FROZEN_FUNCTION_RETURN_VALUE) {
+                    isFrozen = false;
+                }
+            }
+
+            if (isFrozen) {
+                frozenBer = state.evictedRequests.at(appId);
+
+                // If the app is frozen (i.e. all messages have frozen) it
+                // should not be fully in the in-flight map anymore
+                if (state.inFlightReqs.contains(appId) &&
+                    frozenBer->messages_size() ==
+                      state.inFlightReqs.at(appId).first->messages_size()) {
+                    SPDLOG_ERROR("Inconsistent planner state: app {} is both "
+                                 "frozen and in-flight!",
+                                 appId);
+                    return nullptr;
+                }
+            }
+        }
+
+        if (!isFrozen) {
+            if (!state.appResults.contains(appId)) {
+                return nullptr;
+            }
+
+            for (auto msgResultPair : state.appResults.at(appId)) {
+                *berStatus->add_messageresults() = *(msgResultPair.second);
+            }
+
+            // Set the finished condition
+            berStatus->set_finished(!state.inFlightReqs.contains(appId));
         }
+    }
 
-        for (auto msgResultPair : state.appResults.at(appId)) {
-            *berStatus->add_messageresults() = *(msgResultPair.second);
+    // Only try to un-freeze when the app is fully frozen and not in-flight.
+    // Note that when we un-freeze it may be that the app is not still
+    // fully in-flight, and hence we have not removed it from the evicted map
+    if (isFrozen && !state.inFlightReqs.contains(appId)) {
+        SPDLOG_DEBUG("Planner trying to un-freeze app {}", appId);
+
+        // This should trigger a NEW decision. We make a deep-copy of the BER
+        // to avoid changing the values in the evicted map
+        auto newBer = std::make_shared<BatchExecuteRequest>();
+        *newBer = *frozenBer;
+        auto decision = callBatch(newBer);
+
+        // This means that there are not enough free slots to schedule the
+        // decision, we must just return a keep-alive to the poller thread
+        if (*decision == NOT_ENOUGH_SLOTS_DECISION) {
+            SPDLOG_DEBUG("Can not un-freeze app {}: not enough slots!", appId);
         }
 
-        // Set the finished condition
-        berStatus->set_finished(!state.inFlightReqs.contains(appId));
+        // In any case, the app is in-flight and so not finished
+        berStatus->set_finished(false);
     }
 
     return berStatus;
@@ -592,14 +712,41 @@ int Planner::getNumMigrations()
     return state.numMigrations.load(std::memory_order_acquire);
 }
 
+std::set<std::string> Planner::getNextEvictedHostIps()
+{
+    faabric::util::SharedLock lock(plannerMx);
+
+    return state.nextEvictedHostIps;
+}
+
+std::map<int32_t, std::shared_ptr<BatchExecuteRequest>>
+Planner::getEvictedReqs()
+{
+    faabric::util::SharedLock lock(plannerMx);
+
+    std::map<int32_t, std::shared_ptr<BatchExecuteRequest>> evictedReqs;
+
+    for (const auto& [appId, ber] : state.evictedRequests) {
+        evictedReqs[appId] = std::make_shared<BatchExecuteRequest>();
+        *evictedReqs.at(appId) = *ber;
+    }
+
+    return evictedReqs;
+}
+
 static faabric::batch_scheduler::HostMap convertToBatchSchedHostMap(
-  std::map<std::string, std::shared_ptr<Host>> hostMapIn)
+  std::map<std::string, std::shared_ptr<Host>> hostMapIn,
+  const std::set<std::string>& nextEvictedHostIps)
 {
     faabric::batch_scheduler::HostMap hostMap;
 
     for (const auto& [ip, host] : hostMapIn) {
         hostMap[ip] = std::make_shared<faabric::batch_scheduler::HostState>(
           host->ip(), host->slots(), host->usedslots());
+
+        if (nextEvictedHostIps.contains(ip)) {
+            hostMap.at(ip)->ip = MUST_EVICT_IP;
+        }
     }
 
     return hostMap;
@@ -620,7 +767,8 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
 
     // Make a copy of the host-map state to make sure the scheduling process
     // does not modify it
-    auto hostMapCopy = convertToBatchSchedHostMap(state.hostMap);
+    auto hostMapCopy =
+      convertToBatchSchedHostMap(state.hostMap, state.nextEvictedHostIps);
     bool isDistChange =
       decisionType == faabric::batch_scheduler::DecisionType::DIST_CHANGE;
 
@@ -652,17 +800,19 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
     if (!isDistChange && state.preloadedSchedulingDecisions.contains(appId)) {
         decision = getPreloadedSchedulingDecision(appId, req);
     } else if (isNew && isMpi) {
-        mpiReq = faabric::util::batchExecFactory(
-          req->user(), req->function(), req->messages(0).mpiworldsize());
-        // Propagate the subtype for multi-tenant runs
-        mpiReq->set_subtype(req->subtype());
-
-        // Populate the temporary request
-        mpiReq->mutable_messages()->at(0) = req->messages(0);
-        faabric::util::updateBatchExecAppId(mpiReq, appId);
-        for (int i = 0; i < mpiReq->messages_size(); i++) {
-            mpiReq->mutable_messages()->at(i).set_groupidx(i);
+        mpiReq = std::make_shared<BatchExecuteRequest>();
+        *mpiReq = *req;
+
+        // Deep-copy as many messages we can from the original BER, and mock
+        // the rest
+        for (int i = req->messages_size(); i < req->messages(0).mpiworldsize();
+             i++) {
+            auto* newMpiMsg = mpiReq->add_messages();
+
+            newMpiMsg->set_appid(req->appid());
+            newMpiMsg->set_groupidx(i);
         }
+        assert(mpiReq->messages_size() == req->messages(0).mpiworldsize());
 
         decision = batchScheduler->makeSchedulingDecision(
           hostMapCopy, state.inFlightReqs, mpiReq);
@@ -678,7 +828,9 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
           "Not enough free slots to schedule app: {} (requested: {})",
           appId,
           req->messages_size());
+#ifndef NDEBUG
         printHostState(state.hostMap, "error");
+#endif
         return decision;
     }
 
@@ -687,6 +839,67 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
         return decision;
     }
 
+    if (*decision == MUST_FREEZE_DECISION) {
+        SPDLOG_INFO("Decided to FREEZE app: {}", appId);
+
+        // Note that the app will be naturally removed from in-flight as the
+        // messages throw an exception and finish, so here we only need to
+        // add the request to the evicted requests. Also, given that the
+        // app will be removed from in-flight, we want to deep copy the BER
+        state.evictedRequests[appId] =
+          std::make_shared<faabric::BatchExecuteRequest>();
+        *state.evictedRequests.at(appId) = *state.inFlightReqs.at(appId).first;
+
+        return decision;
+    }
+
+    // If we have managed to schedule a frozen request, un-freeze it by
+    // removing it from the evicted request map
+    if (state.evictedRequests.contains(appId)) {
+        // When un-freezing an MPI app, we treat it as a NEW request, and thus
+        // we will go through the two-step initialisation with a preloaded
+        // decision
+        if (isNew && isMpi) {
+            // During the first step, and to make the downstream assertions
+            // pass, it is safe to remove all messages greater than zero from
+            // here
+            SPDLOG_INFO("Decided to un-FREEZE  app {}", appId);
+
+            auto firstMessage = req->messages(0);
+            req->clear_messages();
+            *req->add_messages() = firstMessage;
+        } else if (isMpi && !isDistChange) {
+            // During the second step, we amend the messages provided by MPI
+            // (as part of MPI_Init) with the fields that we require for a
+            // successful restore
+            assert(req->messages_size() == req->messages(0).mpiworldsize() - 1);
+
+            auto evictedBer = state.evictedRequests.at(appId);
+            for (int i = 0; i < req->messages_size(); i++) {
+                for (int j = 1; j < evictedBer->messages_size(); j++) {
+                    // We match by groupidx and not by message id, as the
+                    // message id is set by MPI and we need to overwrite it
+                    if (req->messages(i).groupidx() ==
+                        evictedBer->messages(j).groupidx()) {
+
+                        req->mutable_messages()->at(i).set_id(
+                          evictedBer->messages(j).id());
+                        req->mutable_messages()->at(i).set_funcptr(
+                          evictedBer->messages(j).funcptr());
+                        req->mutable_messages()->at(i).set_inputdata(
+                          evictedBer->messages(j).inputdata());
+                        req->mutable_messages()->at(i).set_snapshotkey(
+                          evictedBer->messages(j).snapshotkey());
+
+                        break;
+                    }
+                }
+            }
+
+            state.evictedRequests.erase(appId);
+        }
+    }
+
     // Skip claiming slots and ports if we have preemptively allocated them
     bool skipClaim = decision->groupId == MPI_PRELOADED_DECISION_GROUPID;
 
@@ -967,6 +1180,30 @@ void Planner::dispatchSchedulingDecision(
             }
         }
 
+        // In an un-FREEZE request, we need to first push the snapshots to
+        // the destination host. This snapshots correspond to the messages
+        // that were FROZEN
+        if (!isThreads && !hostReq->messages(0).snapshotkey().empty()) {
+            // Unlike in a THREADS request, each un-forzen message has a
+            // different snapshot
+            // TODO: consider ways to optimise this transferring
+            for (int i = 0; i < hostReq->messages_size(); i++) {
+                auto snapshotKey = hostReq->messages(i).snapshotkey();
+                try {
+                    auto snap = snapshotRegistry.getSnapshot(snapshotKey);
+
+                    // TODO: could we push only the diffs?
+                    faabric::snapshot::getSnapshotClient(hostIp)->pushSnapshot(
+                      snapshotKey, snap);
+                } catch (std::runtime_error& e) {
+                    // Catch errors, but don't let them crash the planner. Let
+                    // the worker crash instead
+                    SPDLOG_ERROR("Snapshot {} not regsitered in planner!",
+                                 snapshotKey);
+                }
+            }
+        }
+
         faabric::scheduler::getFunctionCallClient(hostIp)->executeFunctions(
           hostReq);
     }
@@ -975,6 +1212,20 @@ void Planner::dispatchSchedulingDecision(
                  req->messages_size());
 }
 
+// TODO: should check if the VM is in the host map!
+void Planner::setNextEvictedVm(const std::set<std::string>& vmIps)
+{
+    faabric::util::FullLock lock(plannerMx);
+
+    if (state.policy != "spot") {
+        SPDLOG_ERROR("Error setting evicted VM with policy {}", state.policy);
+        SPDLOG_ERROR("To set the next evicted VM policy must be: spot");
+        throw std::runtime_error("Error setting the next evicted VM!");
+    }
+
+    state.nextEvictedHostIps = vmIps;
+}
+
 Planner& getPlanner()
 {
     static Planner planner;
diff --git a/src/planner/PlannerEndpointHandler.cpp b/src/planner/PlannerEndpointHandler.cpp
index bb18e870e..2ef7f65ea 100644
--- a/src/planner/PlannerEndpointHandler.cpp
+++ b/src/planner/PlannerEndpointHandler.cpp
@@ -181,6 +181,12 @@ void PlannerEndpointHandler::onRequest(
                 auto* inFlightAppResp = inFlightAppsResponse.add_apps();
                 inFlightAppResp->set_appid(appId);
                 inFlightAppResp->set_subtype(inFlightPair.first->subtype());
+
+                if (inFlightPair.first->messages(0).ismpi()) {
+                    inFlightAppResp->set_size(
+                      inFlightPair.first->messages(0).mpiworldsize());
+                }
+
                 for (const auto& hostIp : decision->hosts) {
                     inFlightAppResp->add_hostips(hostIp);
                 }
@@ -191,6 +197,25 @@ void PlannerEndpointHandler::onRequest(
               faabric::planner::getPlanner().getNumMigrations();
             inFlightAppsResponse.set_nummigrations(numMigrations);
 
+            // Include the next VM that will be evicted
+            auto nextEvictedIps =
+              faabric::planner::getPlanner().getNextEvictedHostIps();
+            for (const auto& nextEvictedHostIp : nextEvictedIps) {
+                inFlightAppsResponse.add_nextevictedvmips(nextEvictedHostIp);
+            }
+
+            // Include the currently frozen apps
+            auto evictedApps = faabric::planner::getPlanner().getEvictedReqs();
+            for (const auto& [appId, ber] : evictedApps) {
+                auto* frozenApp = inFlightAppsResponse.add_frozenapps();
+
+                frozenApp->set_appid(appId);
+
+                if (ber->messages(0).ismpi()) {
+                    frozenApp->set_size(ber->messages(0).mpiworldsize());
+                }
+            }
+
             response.result(beast::http::status::ok);
             response.body() =
               faabric::util::messageToJson(inFlightAppsResponse);
@@ -328,6 +353,48 @@ void PlannerEndpointHandler::onRequest(
 
             return ctx.sendFunction(std::move(response));
         }
+        case faabric::planner::HttpMessage_Type_GET_POLICY: {
+            SPDLOG_DEBUG("Planner received GET_POLICY request");
+
+            // Prepare the response
+            response.result(beast::http::status::ok);
+            response.body() = faabric::planner::getPlanner().getPolicy();
+
+            return ctx.sendFunction(std::move(response));
+        }
+        case faabric::planner::HttpMessage_Type_SET_NEXT_EVICTED_VM: {
+            SPDLOG_DEBUG("Planner received SET_NEXT_EVICTED_VM request");
+
+            SetEvictedVmIpsRequest evictedReq;
+            try {
+                faabric::util::jsonToMessage(msg.payloadjson(), &evictedReq);
+            } catch (faabric::util::JsonSerialisationException e) {
+                response.result(beast::http::status::bad_request);
+                response.body() = std::string("Bad JSON in body's payload");
+                return ctx.sendFunction(std::move(response));
+            }
+
+            std::string vmIp = msg.payloadjson();
+            std::set<std::string> vmIps;
+            for (const auto& vmIp : evictedReq.vmips()) {
+                vmIps.insert(vmIp);
+            }
+
+            try {
+                faabric::planner::getPlanner().setNextEvictedVm(vmIps);
+            } catch (std::exception& e) {
+                response.result(beast::http::status::bad_request);
+                response.body() = std::string(
+                  "Next evicted VM must only be set in 'spot' policy");
+                return ctx.sendFunction(std::move(response));
+            }
+
+            // Prepare the response
+            response.result(beast::http::status::ok);
+            response.body() = std::string("Next evicted VM set");
+
+            return ctx.sendFunction(std::move(response));
+        }
         default: {
             SPDLOG_ERROR("Unrecognised message type {}", msg.type());
             response.result(beast::http::status::bad_request);
diff --git a/src/planner/planner.proto b/src/planner/planner.proto
index 6744eaa4b..e0ebf21e6 100644
--- a/src/planner/planner.proto
+++ b/src/planner/planner.proto
@@ -46,6 +46,11 @@ message HttpMessage {
         EXECUTE_BATCH_STATUS = 11;
         PRELOAD_SCHEDULING_DECISION = 12;
         SET_POLICY = 13;
+        GET_POLICY = 14;
+        // This endpoint is only used with the SPOT planner policy. In a real
+        // deployment we would get this value from a cloud-provider-specific
+        // API
+        SET_NEXT_EVICTED_VM = 15;
     }
 
     Type type = 1 [json_name = "http_type"];
@@ -64,11 +69,22 @@ message GetInFlightAppsResponse {
     message InFlightApp {
         int32 appId = 1;
         int32 subType = 2;
-        repeated string hostIps = 3;
+        int32 size = 3;
+        repeated string hostIps = 4;
+    }
+
+    message FrozenApp {
+        int32 appId = 1;
+        int32 subType = 2;
+        int32 size = 3;
     }
 
     repeated InFlightApp apps = 1;
+
+    // Auxiliary fields to visualise the state of the cluster
     int32 numMigrations = 2;
+    repeated string nextEvictedVmIps = 3;
+    repeated FrozenApp frozenApps = 4;
 }
 
 message NumMigrationsResponse {
@@ -132,3 +148,6 @@ message AvailableHostsResponse {
     repeated Host hosts = 1;
 }
 
+message SetEvictedVmIpsRequest {
+    repeated string vmIps = 1;
+}
diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp
index ae7ac7f72..b9375b88a 100644
--- a/src/scheduler/Scheduler.cpp
+++ b/src/scheduler/Scheduler.cpp
@@ -17,8 +17,6 @@
 #include <faabric/util/snapshot.h>
 #include <faabric/util/testing.h>
 
-#include <unordered_set>
-
 using namespace faabric::util;
 using namespace faabric::snapshot;
 
@@ -298,9 +296,25 @@ void Scheduler::executeBatch(std::shared_ptr<faabric::BatchExecuteRequest> req)
         for (int i = 0; i < nMessages; i++) {
             faabric::Message& localMsg = req->mutable_messages()->at(i);
 
-            std::shared_ptr<faabric::executor::Executor> e =
-              claimExecutor(localMsg, lock);
-            e->executeTasks({ i }, req);
+            try {
+                std::shared_ptr<faabric::executor::Executor> exec =
+                  claimExecutor(localMsg, lock);
+                exec->executeTasks({ i }, req);
+            } catch (std::exception& exc) {
+                SPDLOG_ERROR(
+                  "{}:{}:{} Error trying to claim executor for message {}",
+                  localMsg.appid(),
+                  localMsg.groupid(),
+                  localMsg.groupidx(),
+                  localMsg.id());
+
+                // Even if the task is not executed, set its result so that
+                // everybody knows it has failed
+                localMsg.set_returnvalue(1);
+                localMsg.set_outputdata("Error trying to claim executor");
+                faabric::planner::getPlannerClient().setMessageResult(
+                  std::make_shared<faabric::Message>(localMsg));
+            }
         }
     }
 }
@@ -454,6 +468,8 @@ Scheduler::checkForMigrationOpportunities(faabric::Message& msg,
         // Update the group ID if we want to migrate
         if (decision == DO_NOT_MIGRATE_DECISION) {
             newGroupId = groupId;
+        } else if (decision == MUST_FREEZE_DECISION) {
+            newGroupId = MUST_FREEZE;
         } else {
             newGroupId = decision.groupId;
         }
@@ -475,6 +491,13 @@ Scheduler::checkForMigrationOpportunities(faabric::Message& msg,
         newGroupId = overwriteNewGroupId;
     }
 
+    if (newGroupId == MUST_FREEZE) {
+        auto migration = std::make_shared<faabric::PendingMigration>();
+        migration->set_appid(MUST_FREEZE);
+
+        return migration;
+    }
+
     bool appMustMigrate = newGroupId != groupId;
     if (!appMustMigrate) {
         return nullptr;
diff --git a/tests/dist/mpi/mpi_native.cpp b/tests/dist/mpi/mpi_native.cpp
index 95c0fc9de..fda78b96f 100644
--- a/tests/dist/mpi/mpi_native.cpp
+++ b/tests/dist/mpi/mpi_native.cpp
@@ -1,5 +1,6 @@
 #include "mpi_native.h"
 
+#include <faabric/batch-scheduler/BatchScheduler.h>
 #include <faabric/executor/ExecutorContext.h>
 #include <faabric/mpi/MpiContext.h>
 #include <faabric/mpi/MpiMessage.h>
@@ -15,6 +16,7 @@
 #include <faabric/util/config.h>
 #include <faabric/util/logging.h>
 #include <faabric/util/memory.h>
+#include <faabric/util/network.h>
 
 using namespace faabric::mpi;
 
@@ -785,7 +787,40 @@ void mpiMigrationPoint(int entrypointFuncArg)
 
     // Detect if there is a pending migration for the current app
     auto migration = sch.checkForMigrationOpportunities(*call);
-    bool appMustMigrate = migration != nullptr;
+    bool appMustFreeze =
+      migration != nullptr && migration->appid() == MUST_FREEZE;
+
+    // Short-cut for when all messages need to freeze. We only need to send
+    // a snapshot to the planner, and throw an exception
+    if (appMustFreeze) {
+        std::string argStr = std::to_string(entrypointFuncArg);
+        std::vector<uint8_t> inputData(argStr.begin(), argStr.end());
+        std::string snapKey = "migration_" + std::to_string(call->id());
+
+        call->set_inputdata(inputData.data(), inputData.size());
+        call->set_snapshotkey(snapKey);
+
+        auto* exec = faabric::executor::ExecutorContext::get()->getExecutor();
+        auto snap =
+          std::make_shared<faabric::util::SnapshotData>(exec->getMemoryView());
+        auto& reg = faabric::snapshot::getSnapshotRegistry();
+        reg.registerSnapshot(snapKey, snap);
+
+        auto plannerIp = faabric::util::getIPFromHostname(
+          faabric::util::getSystemConfig().plannerHost);
+        faabric::snapshot::getSnapshotClient(plannerIp)->pushSnapshot(snapKey,
+                                                                      snap);
+
+        SPDLOG_INFO("{}:{}:{} Freezing message!",
+                    call->appid(),
+                    call->groupid(),
+                    call->groupidx());
+
+        // Throw an exception to be caught by the executor and terminate
+        throw faabric::util::FunctionFrozenException("Freezing MPI rank");
+    }
+
+    bool appMustMigrate = migration != nullptr && !appMustFreeze;
 
     // Detect if this particular function needs to be migrated or not
     bool funcMustMigrate = false;
diff --git a/tests/dist/mpi/test_multiple_mpi_worlds.cpp b/tests/dist/mpi/test_multiple_mpi_worlds.cpp
index 74d9990ff..8714ad3a4 100644
--- a/tests/dist/mpi/test_multiple_mpi_worlds.cpp
+++ b/tests/dist/mpi/test_multiple_mpi_worlds.cpp
@@ -243,4 +243,168 @@ TEST_CASE_METHOD(MpiDistTestsFixture,
     checkAllocationAndResult(req1, hostsAfter1);
     checkAllocationAndResult(req2, hostsAfter2);
 }
+
+TEST_CASE_METHOD(
+  MpiDistTestsFixture,
+  "Test migrating an MPI app as a consequence of an eviction (SPOT)",
+  "[mpi]")
+{
+    updatePlannerPolicy("spot");
+
+    int worldSize = 4;
+
+    // Prepare both requests:
+    // - The first will do work, sleep for five seconds, and do work again
+    // - The second will do work and check for migration opportunities
+    auto req1 = setRequest("alltoall-sleep");
+    auto req2 = setRequest("migration");
+    auto& msg = req2->mutable_messages()->at(0);
+    msg.set_inputdata(std::to_string(NUM_MIGRATION_LOOPS));
+
+    updateLocalSlots(8);
+    updateRemoteSlots(8);
+
+    std::vector<std::string> hostsBefore1 = {
+        getMasterIP(), getMasterIP(), getMasterIP(), getMasterIP()
+    };
+    std::vector<std::string> hostsBefore2;
+    std::vector<std::string> hostsAfterMigration;
+
+    std::string evictedVmIp;
+
+    SECTION("Migrate main rank")
+    {
+        hostsBefore2 = {
+            getWorkerIP(), getWorkerIP(), getMasterIP(), getMasterIP()
+        };
+        evictedVmIp = getWorkerIP();
+        hostsAfterMigration =
+          std::vector<std::string>(worldSize, getMasterIP());
+    }
+
+    SECTION("Don't migrate main rank")
+    {
+        hostsBefore2 = {
+            getMasterIP(), getMasterIP(), getWorkerIP(), getWorkerIP()
+        };
+        evictedVmIp = getWorkerIP();
+        hostsAfterMigration =
+          std::vector<std::string>(worldSize, getMasterIP());
+    }
+
+    SECTION("Migrate all ranks")
+    {
+        hostsBefore2 = {
+            getMasterIP(), getMasterIP(), getMasterIP(), getMasterIP()
+        };
+        evictedVmIp = getMasterIP();
+        hostsAfterMigration =
+          std::vector<std::string>(worldSize, getWorkerIP());
+    }
+
+    // Preload decisions to force sub-optimal scheduling
+    auto preloadDec1 = std::make_shared<batch_scheduler::SchedulingDecision>(
+      req1->appid(), req1->groupid());
+    auto preloadDec2 = std::make_shared<batch_scheduler::SchedulingDecision>(
+      req2->appid(), req2->groupid());
+    for (int i = 0; i < worldSize; i++) {
+        preloadDec1->addMessage(hostsBefore1.at(i), 0, 0, i);
+        preloadDec2->addMessage(hostsBefore2.at(i), 0, 0, i);
+    }
+    plannerCli.preloadSchedulingDecision(preloadDec1);
+    plannerCli.preloadSchedulingDecision(preloadDec2);
+
+    // Preload should overwrite the evicted IP, so we can set it before we
+    // call callFunctions
+    setNextEvictedVmIp({ evictedVmIp });
+
+    plannerCli.callFunctions(req1);
+    auto actualHostsBefore1 = waitForMpiMessagesInFlight(req1);
+    REQUIRE(hostsBefore1 == actualHostsBefore1);
+
+    plannerCli.callFunctions(req2);
+
+    checkAllocationAndResult(req1, hostsBefore1);
+    checkAllocationAndResult(req2, hostsAfterMigration);
+
+    updatePlannerPolicy("bin-pack");
+}
+
+TEST_CASE_METHOD(MpiDistTestsFixture,
+                 "Test stopping and resuming an MPI application (SPOT)",
+                 "[mpi]")
+{
+    updatePlannerPolicy("spot");
+
+    int worldSize = 4;
+
+    // Prepare both requests:
+    // - The first will do work, sleep for five seconds, and do work again
+    // - The second will do work and check for migration opportunities
+    auto req1 = setRequest("alltoall-sleep");
+    auto req2 = setRequest("migration");
+    auto& msg = req2->mutable_messages()->at(0);
+    msg.set_inputdata(std::to_string(NUM_MIGRATION_LOOPS));
+
+    // Make it so that there is not enough slots to migrate. We will have to
+    // wait for the first request to finish to be able to resume the app
+    updateLocalSlots(4);
+    updateRemoteSlots(4);
+
+    auto hostsBefore1 = std::vector<std::string>(worldSize, getMasterIP());
+    // This app will realise it is running on a VM that will be evicted, so
+    // it will FREEZE
+    auto hostsBefore2 = std::vector<std::string>(worldSize, getWorkerIP());
+    std::string evictedVmIp = getWorkerIP();
+
+    // Preload decisions to force the allocation we want
+    auto preloadDec1 = std::make_shared<batch_scheduler::SchedulingDecision>(
+      req1->appid(), req1->groupid());
+    auto preloadDec2 = std::make_shared<batch_scheduler::SchedulingDecision>(
+      req2->appid(), req2->groupid());
+    for (int i = 0; i < worldSize; i++) {
+        preloadDec1->addMessage(hostsBefore1.at(i), 0, 0, i);
+        preloadDec2->addMessage(hostsBefore2.at(i), 0, 0, i);
+    }
+    plannerCli.preloadSchedulingDecision(preloadDec1);
+    plannerCli.preloadSchedulingDecision(preloadDec2);
+
+    // Mark the worker VM as evicted (note that preload takes preference over
+    // eviction marks)
+    setNextEvictedVmIp({ evictedVmIp });
+
+    plannerCli.callFunctions(req1);
+    auto actualHostsBefore1 = waitForMpiMessagesInFlight(req1);
+    REQUIRE(hostsBefore1 == actualHostsBefore1);
+
+    plannerCli.callFunctions(req2);
+
+    // First, if we try to get the batch results it shoud say that the app
+    // is not finished (even though it will try to re-schedule it again). To
+    // the eyes of the client, a FROZEN app is still running
+    auto batchResults2 = plannerCli.getBatchResults(req2);
+    REQUIRE(!batchResults2->finished());
+
+    // Second, let's wait for the first request to finish so that more
+    // slots free up
+    checkAllocationAndResult(req1, hostsBefore1);
+
+    // Third, no apps are currently in-flight (1 finished, 1 frozen)
+    auto inFlightApps = getInFlightApps();
+    REQUIRE(inFlightApps.apps_size() == 0);
+    REQUIRE(inFlightApps.frozenapps_size() == 1);
+    REQUIRE(inFlightApps.frozenapps(0).appid() == req2->appid());
+
+    // Fourth, try to get the batch results for the FROZEN app again to trigger
+    // an un-FREEZE
+    batchResults2 = plannerCli.getBatchResults(req2);
+    REQUIRE(!batchResults2->finished());
+
+    // Finally, we should be able to wait on
+    auto hostsAfterMigration =
+      std::vector<std::string>(worldSize, getMasterIP());
+    checkAllocationAndResult(req2, hostsAfterMigration);
+
+    updatePlannerPolicy("bin-pack");
+}
 }
diff --git a/tests/test/batch-scheduler/test_spot_scheduler.cpp b/tests/test/batch-scheduler/test_spot_scheduler.cpp
new file mode 100644
index 000000000..13f63b219
--- /dev/null
+++ b/tests/test/batch-scheduler/test_spot_scheduler.cpp
@@ -0,0 +1,450 @@
+#include <catch2/catch.hpp>
+
+#include "fixtures.h"
+
+#include <faabric/batch-scheduler/BatchScheduler.h>
+#include <faabric/batch-scheduler/SpotScheduler.h>
+
+using namespace faabric::batch_scheduler;
+
+namespace tests {
+
+class SpotSchedulerTestFixture : public BatchSchedulerFixture
+{
+  public:
+    SpotSchedulerTestFixture()
+    {
+        conf.batchSchedulerMode = "spot";
+        batchScheduler = getBatchScheduler();
+    }
+};
+
+// SPOT should behave the same as COMPACT and BIN-PACK for NEW requests
+TEST_CASE_METHOD(SpotSchedulerTestFixture,
+                 "Test scheduling of new requests with Spot",
+                 "[batch-scheduler]")
+{
+    // To mock new requests (i.e. DecisionType::NEW), we always set the
+    // InFlightReqs map to an empty  map
+    BatchSchedulerConfig config = {
+        .hostMap = {},
+        .inFlightReqs = {},
+        .expectedDecision = SchedulingDecision(appId, groupId),
+    };
+
+    SECTION("Compact scheduler gives up if not enough slots are available")
+    {
+        config.hostMap = buildHostMap({ "foo", "bar" }, { 1, 1 }, { 0, 0 });
+        ber = faabric::util::batchExecFactory("bat", "man", 6);
+        config.expectedDecision = NOT_ENOUGH_SLOTS_DECISION;
+    }
+
+    SECTION("Scheduling fits in one host")
+    {
+        config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 0, 0 });
+        ber = faabric::util::batchExecFactory("bat", "man", 3);
+        config.expectedDecision =
+          buildExpectedDecision(ber, { "foo", "foo", "foo" });
+    }
+
+    SECTION("Scheduling is exactly one host")
+    {
+        config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 0, 0 });
+        ber = faabric::util::batchExecFactory("bat", "man", 4);
+        config.expectedDecision =
+          buildExpectedDecision(ber, { "foo", "foo", "foo", "foo" });
+    }
+
+    // The bin-pack scheduler will pick hosts with larger empty slots first
+    SECTION("Scheduling spans two hosts")
+    {
+        config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 0, 0 });
+        ber = faabric::util::batchExecFactory("bat", "man", 6);
+        config.expectedDecision = buildExpectedDecision(
+          ber, { "foo", "foo", "foo", "foo", "bar", "bar" });
+    }
+
+    SECTION("Scheduling spans exactly two hosts")
+    {
+        config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 0, 0 });
+        ber = faabric::util::batchExecFactory("bat", "man", 7);
+        config.expectedDecision = buildExpectedDecision(
+          ber, { "foo", "foo", "foo", "foo", "bar", "bar", "bar" });
+    }
+
+    // In particular, it will prioritise hosts with overall less capacity if
+    // they have more free resources
+    SECTION("Scheduling spans two hosts")
+    {
+        config.hostMap = buildHostMap({ "foo", "bar" }, { 3, 4 }, { 0, 2 });
+        ber = faabric::util::batchExecFactory("bat", "man", 4);
+        config.expectedDecision =
+          buildExpectedDecision(ber, { "foo", "foo", "foo", "bar" });
+    }
+
+    // In case of a tie in free resources, the Compact scheduler will pick
+    // hosts with larger overall capacity first
+    SECTION("Scheduling spans two hosts with same free resources")
+    {
+        config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 1, 0 });
+        ber = faabric::util::batchExecFactory("bat", "man", 6);
+        config.expectedDecision = buildExpectedDecision(
+          ber, { "foo", "foo", "foo", "bar", "bar", "bar" });
+    }
+
+    // If there's still a tie, the Compact scheduler will solve the tie by
+    // sorting the hosts alphabetically (from larger to smaller)
+    SECTION("Scheduling spans two hosts with same free resources and size")
+    {
+        config.hostMap = buildHostMap({ "foo", "bar" }, { 3, 3 }, { 0, 0 });
+        ber = faabric::util::batchExecFactory("bat", "man", 6);
+        config.expectedDecision = buildExpectedDecision(
+          ber, { "foo", "foo", "foo", "bar", "bar", "bar" });
+    }
+
+    SECTION("Scheduling spans an arbitrarily large number of hosts")
+    {
+        config.hostMap = buildHostMap({ "foo", "bar", "baz", "bip", "bup" },
+                                      { 4, 6, 2, 3, 1 },
+                                      { 0, 2, 2, 2, 0 });
+        ber = faabric::util::batchExecFactory("bat", "man", 10);
+        config.expectedDecision = buildExpectedDecision(ber,
+                                                        { "bar",
+                                                          "bar",
+                                                          "bar",
+                                                          "bar",
+                                                          "foo",
+                                                          "foo",
+                                                          "foo",
+                                                          "foo",
+                                                          "bip",
+                                                          "bup" });
+    }
+
+    actualDecision = *batchScheduler->makeSchedulingDecision(
+      config.hostMap, config.inFlightReqs, ber);
+    compareSchedulingDecisions(actualDecision, config.expectedDecision);
+}
+
+// SPOT should behave the same as COMPACT and BIN-PACK for SCALE_CHANGE requests
+TEST_CASE_METHOD(SpotSchedulerTestFixture,
+                 "Test scheduling of scale-change requests with SPOT",
+                 "[batch-scheduler]")
+{
+    // To mock a scale-change request (i.e. DecisionType::SCALE_CHANGE), we
+    // need to have one in-flight request in the map with the same app id
+    // (and not of type MIGRATION)
+    BatchSchedulerConfig config = {
+        .hostMap = {},
+        .inFlightReqs = {},
+        .expectedDecision = SchedulingDecision(appId, groupId),
+    };
+
+    // The configs in this test must be read as follows:
+    // - the host map's used slots contains the current distribution for the app
+    //   (i.e. the number of used slots matches the number in in-flight reqs)
+    // - the host map's slots contain the total slots
+    // - the ber contains the NEW messages we are going to add
+    // - the expected decision includes the expected scheduling decision for
+    //   the new messages
+
+    SECTION("Compact scheduler gives up if not enough slots are available")
+    {
+        config.hostMap = buildHostMap({ "foo", "bar" }, { 2, 1 }, { 1, 0 });
+        ber = faabric::util::batchExecFactory("bat", "man", 6);
+        config.inFlightReqs = buildInFlightReqs(ber, 1, { "foo" });
+        config.expectedDecision = NOT_ENOUGH_SLOTS_DECISION;
+    }
+
+    // When scheduling a SCALE_CHANGE request, we always try to colocate as
+    // much as possible
+    SECTION("Scheduling fits in one host")
+    {
+        config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 1, 0 });
+        ber = faabric::util::batchExecFactory("bat", "man", 3);
+        config.inFlightReqs = buildInFlightReqs(ber, 1, { "foo" });
+        config.expectedDecision =
+          buildExpectedDecision(ber, { "foo", "foo", "foo" });
+    }
+
+    // We prefer hosts with less capacity if they are already running requests
+    // for the same app
+    SECTION("Scheduling fits in one host and prefers known hosts")
+    {
+        config.hostMap = buildHostMap({ "foo", "bar" }, { 5, 4 }, { 0, 1 });
+        ber = faabric::util::batchExecFactory("bat", "man", 3);
+        config.inFlightReqs = buildInFlightReqs(ber, 1, { "bar" });
+        config.expectedDecision =
+          buildExpectedDecision(ber, { "bar", "bar", "bar" });
+    }
+
+    // Like with `NEW` requests, we can also spill to other hosts
+    SECTION("Scheduling spans more than one host")
+    {
+        config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 0, 1 });
+        ber = faabric::util::batchExecFactory("bat", "man", 4);
+        config.inFlightReqs = buildInFlightReqs(ber, 1, { "bar" });
+        config.expectedDecision =
+          buildExpectedDecision(ber, { "bar", "bar", "foo", "foo" });
+    }
+
+    // If two hosts are already executing the app, we pick the one that is
+    // running the largest number of messages
+    SECTION("Scheduler prefers hosts with more running messages")
+    {
+        config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 1, 2 });
+        ber = faabric::util::batchExecFactory("bat", "man", 1);
+        config.inFlightReqs =
+          buildInFlightReqs(ber, 3, { "bar", "bar", "foo" });
+        config.expectedDecision = buildExpectedDecision(ber, { "bar" });
+    }
+
+    // Again, when picking a new host to spill to, we priorities hosts that
+    // are already running requests for this app
+    SECTION("Scheduling always picks known hosts first")
+    {
+        config.hostMap = buildHostMap(
+          {
+            "foo",
+            "bar",
+            "baz",
+          },
+          { 4, 3, 2 },
+          { 0, 1, 1 });
+        ber = faabric::util::batchExecFactory("bat", "man", 5);
+        config.inFlightReqs = buildInFlightReqs(ber, 2, { "bar", "baz" });
+        config.expectedDecision =
+          buildExpectedDecision(ber, { "bar", "bar", "baz", "foo", "foo" });
+    }
+
+    // Sometimes the preferred hosts just don't have slots. They will be sorted
+    // first but the scheduler will skip them when bin-packing
+    SECTION("Scheduler ignores preferred but full hosts")
+    {
+        config.hostMap = buildHostMap(
+          {
+            "foo",
+            "bar",
+            "baz",
+          },
+          { 4, 2, 2 },
+          { 0, 2, 1 });
+        ber = faabric::util::batchExecFactory("bat", "man", 3);
+        config.inFlightReqs =
+          buildInFlightReqs(ber, 3, { "bar", "bar", "baz" });
+        config.expectedDecision =
+          buildExpectedDecision(ber, { "baz", "foo", "foo" });
+    }
+
+    // In case of a tie of the number of runing messages, we revert to `NEW`-
+    // like tie breaking
+    SECTION("In case of a tie of preferred hosts, fall-back to known "
+            "tie-breaks (free slots)")
+    {
+        config.hostMap = buildHostMap(
+          {
+            "foo",
+            "bar",
+            "baz",
+          },
+          { 4, 3, 2 },
+          { 0, 1, 1 });
+        ber = faabric::util::batchExecFactory("bat", "man", 3);
+        config.inFlightReqs = buildInFlightReqs(ber, 2, { "bar", "baz" });
+        config.expectedDecision =
+          buildExpectedDecision(ber, { "bar", "bar", "baz" });
+    }
+
+    SECTION("In case of a tie of preferred hosts, fall-back to known "
+            "tie-breaks (size)")
+    {
+        config.hostMap = buildHostMap(
+          {
+            "foo",
+            "bar",
+            "baz",
+          },
+          { 4, 3, 2 },
+          { 0, 2, 1 });
+        ber = faabric::util::batchExecFactory("bat", "man", 3);
+        config.inFlightReqs = buildInFlightReqs(ber, 2, { "bar", "baz" });
+        config.expectedDecision =
+          buildExpectedDecision(ber, { "bar", "baz", "foo" });
+    }
+
+    SECTION("In case of a tie of preferred hosts, fall-back to known "
+            "tie-breaks (alphabetical)")
+    {
+        config.hostMap = buildHostMap(
+          {
+            "foo",
+            "bar",
+            "baz",
+          },
+          { 4, 2, 2 },
+          { 0, 1, 1 });
+        ber = faabric::util::batchExecFactory("bat", "man", 3);
+        config.inFlightReqs = buildInFlightReqs(ber, 2, { "bar", "baz" });
+        config.expectedDecision =
+          buildExpectedDecision(ber, { "baz", "bar", "foo" });
+    }
+
+    actualDecision = *batchScheduler->makeSchedulingDecision(
+      config.hostMap, config.inFlightReqs, ber);
+    compareSchedulingDecisions(actualDecision, config.expectedDecision);
+}
+
+// DIST_CHANGE requests in the SPOT scheduler only concern with tainted VMs
+// (i.e. VMs that are going to be evicted):
+// - If nothing is going to be evicted, there should be no migration happening
+// - If we know one VM is going to be evicted:
+//  - We migrate out of it if there are slots
+//  - We FREEZE if there are no slots to migrate to
+TEST_CASE_METHOD(SpotSchedulerTestFixture,
+                 "Test scheduling of dist-change requests with SPOT",
+                 "[batch-scheduler]")
+{
+    // To mock a dist-change request (i.e. DecisionType::DIST_CHANGE), we
+    // need to have one in-flight request in the map with the same app id, the
+    // same size (and of type MIGRATION)
+    BatchSchedulerConfig config = {
+        .hostMap = {},
+        .inFlightReqs = {},
+        .expectedDecision = SchedulingDecision(appId, groupId),
+    };
+
+    // Note: the way we let the COMPACT scheduler know the VMs that we will
+    // evict is by setting the IP field in the HostMap **value** (not key)
+    // to MUST_EVICT_IP
+
+    SECTION("SPOT returns nothing if there are no tainted VMs")
+    {
+        config.hostMap = buildHostMap({ "foo" }, { 4 }, { 2 });
+        ber = faabric::util::batchExecFactory("bat", "man", 2);
+        ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION);
+        config.inFlightReqs = buildInFlightReqs(ber, 2, { "foo", "foo" });
+        config.expectedDecision = DO_NOT_MIGRATE_DECISION;
+    }
+
+    SECTION("SPOT returns nothing if there are no tainted VMs (multiple)")
+    {
+        config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 2 }, { 4, 1 });
+        ber = faabric::util::batchExecFactory("bat", "man", 5);
+        ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION);
+        config.inFlightReqs =
+          buildInFlightReqs(ber, 5, { "foo", "foo", "foo", "foo", "bar" });
+        config.expectedDecision = DO_NOT_MIGRATE_DECISION;
+    }
+
+    SECTION("SPOT ignores opportunities to free-up hosts")
+    {
+        config.hostMap =
+          buildHostMap({ "foo", "bar", "baz" }, { 4, 4, 4 }, { 2, 2, 4 });
+        ber = faabric::util::batchExecFactory("bat", "man", 4);
+        ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION);
+        config.inFlightReqs =
+          buildInFlightReqs(ber, 4, { "baz", "baz", "baz", "baz" });
+        config.expectedDecision = DO_NOT_MIGRATE_DECISION;
+    }
+
+    SECTION("SPOT deallocates an app if the VM is tainted and not enough slots "
+            "(single-host)")
+    {
+        config.hostMap = buildHostMap({ "foo" }, { 4 }, { 2 });
+        ber = faabric::util::batchExecFactory("bat", "man", 2);
+        ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION);
+        config.inFlightReqs = buildInFlightReqs(ber, 2, { "foo", "foo" });
+        config.expectedDecision = MUST_FREEZE_DECISION;
+        // Must evict host foo
+        markHostAsEvicted(config.hostMap, "foo");
+    }
+
+    SECTION("SPOT deallocates an app if the VM is tainted and not enough slots "
+            "(multiple-hosts)")
+    {
+        config.hostMap = buildHostMap(
+          { "foo", "bar", "baz", "lol" }, { 4, 4, 2, 2 }, { 2, 4, 2, 2 });
+        ber = faabric::util::batchExecFactory("bat", "man", 4);
+        ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION);
+        config.inFlightReqs =
+          buildInFlightReqs(ber, 4, { "foo", "foo", "bar", "bar" });
+        config.expectedDecision = MUST_FREEZE_DECISION;
+        // Must evict host foo
+        markHostAsEvicted(config.hostMap, "foo");
+    }
+
+    SECTION("SPOT migrated an app if enough slots")
+    {
+        config.hostMap = buildHostMap(
+          { "foo", "bar", "baz", "lol" }, { 4, 4, 2, 2 }, { 2, 2, 2, 2 });
+        ber = faabric::util::batchExecFactory("bat", "man", 4);
+        ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION);
+        config.inFlightReqs =
+          buildInFlightReqs(ber, 4, { "baz", "baz", "lol", "lol" });
+
+        // Must evict host foo
+        markHostAsEvicted(config.hostMap, "baz");
+
+        config.expectedDecision =
+          buildExpectedDecision(ber, { "foo", "foo", "lol", "lol" });
+    }
+
+    SECTION("SPOT ignores evicted hosts if not running messages in it")
+    {
+        config.hostMap = buildHostMap(
+          { "foo", "bar", "baz", "lol" }, { 4, 4, 2, 2 }, { 2, 2, 2, 2 });
+        ber = faabric::util::batchExecFactory("bat", "man", 4);
+        ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION);
+        config.inFlightReqs =
+          buildInFlightReqs(ber, 4, { "baz", "baz", "lol", "lol" });
+
+        // Must evict host foo
+        markHostAsEvicted(config.hostMap, "foo");
+
+        config.expectedDecision = DO_NOT_MIGRATE_DECISION;
+    }
+
+    SECTION("Compact prefers hosts running more messages")
+    {
+        config.hostMap = buildHostMap(
+          {
+            "foo",
+            "bar",
+            "baz",
+          },
+          { 3, 2, 1 },
+          { 2, 1, 1 });
+        ber = faabric::util::batchExecFactory("bat", "man", 4);
+        ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION);
+        config.inFlightReqs =
+          buildInFlightReqs(ber, 4, { "foo", "foo", "bar", "baz" });
+
+        markHostAsEvicted(config.hostMap, "bar");
+
+        config.expectedDecision =
+          buildExpectedDecision(ber, { "foo", "foo", "foo", "baz" });
+    }
+
+    SECTION("Compact will minimise the number of messages to migrate")
+    {
+        config.hostMap =
+          buildHostMap({ "foo", "bar", "baz" }, { 5, 4, 2 }, { 3, 4, 2 });
+        ber = faabric::util::batchExecFactory("bat", "man", 9);
+        ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION);
+        config.inFlightReqs = buildInFlightReqs(
+          ber,
+          9,
+          { "foo", "foo", "foo", "bar", "bar", "bar", "bar", "baz", "baz" });
+
+        markHostAsEvicted(config.hostMap, "baz");
+
+        config.expectedDecision = buildExpectedDecision(
+          ber,
+          { "foo", "foo", "foo", "bar", "bar", "bar", "bar", "foo", "foo" });
+    }
+
+    actualDecision = *batchScheduler->makeSchedulingDecision(
+      config.hostMap, config.inFlightReqs, ber);
+    compareSchedulingDecisions(actualDecision, config.expectedDecision);
+}
+}
diff --git a/tests/test/planner/test_planner_endpoint.cpp b/tests/test/planner/test_planner_endpoint.cpp
index 62578f67d..c5f3043c5 100644
--- a/tests/test/planner/test_planner_endpoint.cpp
+++ b/tests/test/planner/test_planner_endpoint.cpp
@@ -662,9 +662,14 @@ TEST_CASE_METHOD(PlannerEndpointExecTestFixture,
     // in-flight apps (we poll the planner until all messages have finished)
     waitForBerToFinish(ber);
 
+    // We also set this host as evicted
+    updatePlannerPolicy("spot");
+    setNextEvictedVmIp({ conf.endpointHost });
+
     // Once we are sure the batch has finished, check again that there are
     // zero apps in-flight
     GetInFlightAppsResponse emptyExpectedResponse;
+    emptyExpectedResponse.add_nextevictedvmips(conf.endpointHost);
     msgJsonStr = faabric::util::messageToJson(inFlightMsg);
     expectedResponseBody = faabric::util::messageToJson(emptyExpectedResponse);
     result = doPost(msgJsonStr);
@@ -748,6 +753,13 @@ TEST_CASE_METHOD(PlannerEndpointExecTestFixture,
         expectedResponseBody = "Policy set correctly";
     }
 
+    SECTION("Valid request (spot)")
+    {
+        policy = "spot";
+        expectedReturnCode = beast::http::status::ok;
+        expectedResponseBody = "Policy set correctly";
+    }
+
     SECTION("Invalid request")
     {
         policy = "foo-bar";
@@ -762,5 +774,73 @@ TEST_CASE_METHOD(PlannerEndpointExecTestFixture,
     REQUIRE(boost::beast::http::int_to_status(result.first) ==
             expectedReturnCode);
     REQUIRE(result.second == expectedResponseBody);
+
+    if (expectedReturnCode == beast::http::status::ok) {
+        // Second, get the policy we just set
+        msg.set_type(HttpMessage_Type_GET_POLICY);
+        msg.clear_payloadjson();
+        msgJsonStr = faabric::util::messageToJson(msg);
+
+        std::pair<int, std::string> result = doPost(msgJsonStr);
+        REQUIRE(boost::beast::http::int_to_status(result.first) ==
+                expectedReturnCode);
+        REQUIRE(result.second == policy);
+    }
+}
+
+TEST_CASE_METHOD(PlannerEndpointExecTestFixture,
+                 "Test setting the next evicted VM",
+                 "[planner]")
+{
+    HttpMessage policyMsg;
+    policyMsg.set_type(HttpMessage_Type_SET_POLICY);
+    HttpMessage msg;
+
+    SetEvictedVmIpsRequest evictedRequest;
+    evictedRequest.add_vmips("1.1.1.1");
+
+    msg.set_type(HttpMessage_Type_SET_NEXT_EVICTED_VM);
+    msg.set_payloadjson(faabric::util::messageToJson(evictedRequest));
+    msgJsonStr = faabric::util::messageToJson(msg);
+
+    std::string policy;
+
+    SECTION("Valid request")
+    {
+        policy = "spot";
+        expectedReturnCode = beast::http::status::ok;
+        expectedResponseBody = "Next evicted VM set";
+    }
+
+    SECTION("Invalid request (bad request body)")
+    {
+        policy = "spot";
+        msg.set_payloadjson("1.1.1.1");
+        msgJsonStr = faabric::util::messageToJson(msg);
+        expectedReturnCode = beast::http::status::bad_request;
+        expectedResponseBody = "Bad JSON in body's payload";
+    }
+
+    SECTION("Invalid request (bad policy)")
+    {
+        policy = "compact";
+        expectedReturnCode = beast::http::status::bad_request;
+        expectedResponseBody =
+          "Next evicted VM must only be set in 'spot' policy";
+    }
+
+    policyMsg.set_payloadjson(policy);
+    std::string policyMsgJsonStr = faabric::util::messageToJson(policyMsg);
+
+    // First set the policy
+    std::pair<int, std::string> result = doPost(policyMsgJsonStr);
+    REQUIRE(boost::beast::http::int_to_status(result.first) ==
+            boost::beast::http::status::ok);
+
+    // Second set the next evicted VM
+    result = doPost(msgJsonStr);
+    REQUIRE(boost::beast::http::int_to_status(result.first) ==
+            expectedReturnCode);
+    REQUIRE(result.second == expectedResponseBody);
 }
 }
diff --git a/tests/test/scheduler/test_scheduler.cpp b/tests/test/scheduler/test_scheduler.cpp
index 00a589dc0..14ed0b1e4 100644
--- a/tests/test/scheduler/test_scheduler.cpp
+++ b/tests/test/scheduler/test_scheduler.cpp
@@ -195,7 +195,7 @@ TEST_CASE_METHOD(SlowExecutorTestFixture,
     SECTION("Processes")
     {
         execMode = faabric::BatchExecuteRequest::PROCESSES;
-        expectedSnapshot = "procSnap";
+        expectedSnapshot = "";
 
         expectedSubType = 345;
         expectedContextData = "proc context";
diff --git a/tests/test/transport/test_message_endpoint_client.cpp b/tests/test/transport/test_message_endpoint_client.cpp
index 493c6a35b..a92b1d2ae 100644
--- a/tests/test/transport/test_message_endpoint_client.cpp
+++ b/tests/test/transport/test_message_endpoint_client.cpp
@@ -17,7 +17,8 @@ using namespace faabric::transport;
 namespace tests {
 
 // These tests are unstable under ThreadSanitizer
-#if !(defined(__has_feature) && __has_feature(thread_sanitizer))
+#if !(defined(__has_feature) &&                                                \
+      (__has_feature(thread_sanitizer) || __has_feature(address_sanitizer)))
 
 TEST_CASE_METHOD(SchedulerFixture, "Test send/recv one message", "[transport]")
 {
@@ -315,18 +316,6 @@ TEST_CASE_METHOD(SchedulerFixture,
     }
 
     REQUIRE(success.load(std::memory_order_acquire));
-
-    for (auto& t : senders) {
-        if (t.joinable()) {
-            t.join();
-        }
-    }
-
-    for (auto& t : receivers) {
-        if (t.joinable()) {
-            t.join();
-        }
-    }
 }
 
 #endif // End ThreadSanitizer exclusion
diff --git a/tests/utils/faabric_utils.h b/tests/utils/faabric_utils.h
index 2e7c777c7..62c0fb39e 100644
--- a/tests/utils/faabric_utils.h
+++ b/tests/utils/faabric_utils.h
@@ -3,6 +3,7 @@
 #include <catch2/catch.hpp>
 
 #include <faabric/batch-scheduler/SchedulingDecision.h>
+#include <faabric/planner/planner.pb.h>
 #include <faabric/state/State.h>
 #include <faabric/state/StateServer.h>
 #include <faabric/util/ExecGraph.h>
@@ -97,5 +98,9 @@ void flushPlannerWorkers();
 
 void resetPlanner();
 
+faabric::planner::GetInFlightAppsResponse getInFlightApps();
+
 void updatePlannerPolicy(const std::string& newPolicy);
+
+void setNextEvictedVmIp(const std::set<std::string>& evictedVmIps);
 }
diff --git a/tests/utils/fixtures.h b/tests/utils/fixtures.h
index 86b4776f5..f40e2405f 100644
--- a/tests/utils/fixtures.h
+++ b/tests/utils/fixtures.h
@@ -646,6 +646,12 @@ class BatchSchedulerFixture : public ConfFixture
         return decision;
     }
 
+    static void markHostAsEvicted(faabric::batch_scheduler::HostMap& hostMap,
+                                  const std::string& hostIp)
+    {
+        hostMap.at(hostIp)->ip = MUST_EVICT_IP;
+    }
+
     static void compareSchedulingDecisions(
       const faabric::batch_scheduler::SchedulingDecision& decisionA,
       const faabric::batch_scheduler::SchedulingDecision& decisionB)
diff --git a/tests/utils/planner_utils.cpp b/tests/utils/planner_utils.cpp
index 2956f06ad..5c0422a58 100644
--- a/tests/utils/planner_utils.cpp
+++ b/tests/utils/planner_utils.cpp
@@ -29,6 +29,41 @@ void updatePlannerPolicy(const std::string& newPolicy)
     assert(result.first == 200);
 }
 
+void setNextEvictedVmIp(const std::set<std::string>& evictedVmIps)
+{
+    faabric::planner::HttpMessage msg;
+    msg.set_type(faabric::planner::HttpMessage_Type_SET_NEXT_EVICTED_VM);
+
+    faabric::planner::SetEvictedVmIpsRequest evictedRequest;
+    for (const auto& evictedIp : evictedVmIps) {
+        evictedRequest.add_vmips(evictedIp);
+    }
+    msg.set_payloadjson(faabric::util::messageToJson(evictedRequest));
+    std::string jsonStr = faabric::util::messageToJson(msg);
+
+    faabric::util::SystemConfig& conf = faabric::util::getSystemConfig();
+    std::pair<int, std::string> result =
+      postToUrl(conf.plannerHost, conf.plannerPort, jsonStr);
+    assert(result.first == 200);
+}
+
+faabric::planner::GetInFlightAppsResponse getInFlightApps()
+{
+    faabric::planner::HttpMessage msg;
+    msg.set_type(faabric::planner::HttpMessage_Type_GET_IN_FLIGHT_APPS);
+    std::string jsonStr = faabric::util::messageToJson(msg);
+
+    faabric::util::SystemConfig& conf = faabric::util::getSystemConfig();
+    std::pair<int, std::string> result =
+      postToUrl(conf.plannerHost, conf.plannerPort, jsonStr);
+    assert(result.first == 200);
+
+    faabric::planner::GetInFlightAppsResponse response;
+    faabric::util::jsonToMessage(result.second, &response);
+
+    return response;
+}
+
 void flushPlannerWorkers()
 {
     faabric::planner::HttpMessage msg;