diff --git a/CMakeLists.txt b/CMakeLists.txt index 1f85309ad..78d0121e4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -96,6 +96,7 @@ endfunction() add_subdirectory(src/batch-scheduler) add_subdirectory(src/endpoint) +add_subdirectory(src/executor) add_subdirectory(src/flat) add_subdirectory(src/mpi) add_subdirectory(src/planner) @@ -121,6 +122,7 @@ add_library(faabric faabric.cpp $ $ + $ $ $ $ diff --git a/include/faabric/executor/Executor.h b/include/faabric/executor/Executor.h new file mode 100644 index 000000000..47e11a295 --- /dev/null +++ b/include/faabric/executor/Executor.h @@ -0,0 +1,119 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace faabric::executor { + +class ChainedCallException : public faabric::util::FaabricException +{ + public: + explicit ChainedCallException(std::string message) + : FaabricException(std::move(message)) + {} +}; + +class Executor +{ + public: + std::string id; + + explicit Executor(faabric::Message& msg); + + // Must be marked virtual to permit proper calling of subclass destructors + virtual ~Executor(); + + void executeTasks(std::vector msgIdxs, + std::shared_ptr req); + + virtual void shutdown(); + + virtual void reset(faabric::Message& msg); + + virtual int32_t executeTask( + int threadPoolIdx, + int msgIdx, + std::shared_ptr req); + + bool tryClaim(); + + void claim(); + + void releaseClaim(); + + std::shared_ptr getMainThreadSnapshot( + faabric::Message& msg, + bool createIfNotExists = false); + + long getMillisSinceLastExec(); + + virtual std::span getMemoryView(); + + virtual void restore(const std::string& snapshotKey); + + faabric::Message& getBoundMessage(); + + bool isExecuting(); + + bool isShutdown() { return _isShutdown; } + + void addChainedMessage(const faabric::Message& msg); + + const faabric::Message& getChainedMessage(int messageId); + + std::set getChainedMessageIds(); + + std::vector mergeDirtyRegions( + const Message& msg, + const std::vector& extraDirtyPages = {}); + + // FIXME: what is the right visibility? + void setThreadResult(faabric::Message& msg, + int32_t returnValue, + const std::string& key, + const std::vector& diffs); + + virtual void setMemorySize(size_t newSize); + + protected: + virtual size_t getMaxMemorySize(); + + faabric::Message boundMessage; + + faabric::snapshot::SnapshotRegistry& reg; + + std::shared_ptr tracker; + + uint32_t threadPoolSize = 0; + + std::map> chainedMessages; + + private: + // ---- Accounting ---- + std::atomic claimed = false; + std::atomic _isShutdown = false; + std::atomic batchCounter = 0; + std::atomic threadBatchCounter = 0; + faabric::util::TimePoint lastExec; + + // ---- Application threads ---- + std::shared_mutex threadExecutionMutex; + std::vector dirtyRegions; + std::vector> threadLocalDirtyRegions; + void deleteMainThreadSnapshot(const faabric::Message& msg); + + // ---- Function execution thread pool ---- + std::mutex threadsMutex; + std::vector> threadPoolThreads; + std::set availablePoolThreads; + + std::vector> threadTaskQueues; + + void threadPoolThread(std::stop_token st, int threadPoolIdx); +}; +} diff --git a/include/faabric/scheduler/ExecutorContext.h b/include/faabric/executor/ExecutorContext.h similarity index 93% rename from include/faabric/scheduler/ExecutorContext.h rename to include/faabric/executor/ExecutorContext.h index 42d1a6449..3b8a97416 100644 --- a/include/faabric/scheduler/ExecutorContext.h +++ b/include/faabric/executor/ExecutorContext.h @@ -1,9 +1,10 @@ #pragma once +#include #include -#include +#include -namespace faabric::scheduler { +namespace faabric::executor { class ExecutorContextException : public faabric::util::FaabricException { diff --git a/include/faabric/scheduler/ExecutorFactory.h b/include/faabric/executor/ExecutorFactory.h similarity index 81% rename from include/faabric/scheduler/ExecutorFactory.h rename to include/faabric/executor/ExecutorFactory.h index bb71d67ce..ba549d2c8 100644 --- a/include/faabric/scheduler/ExecutorFactory.h +++ b/include/faabric/executor/ExecutorFactory.h @@ -1,8 +1,8 @@ #pragma once -#include +#include -namespace faabric::scheduler { +namespace faabric::executor { class ExecutorFactory { diff --git a/include/faabric/executor/ExecutorTask.h b/include/faabric/executor/ExecutorTask.h new file mode 100644 index 000000000..ca81f85dc --- /dev/null +++ b/include/faabric/executor/ExecutorTask.h @@ -0,0 +1,27 @@ +#pragma once + +#include + +namespace faabric::executor { + +class ExecutorTask +{ + public: + ExecutorTask() = default; + + ExecutorTask(int messageIndexIn, + std::shared_ptr reqIn); + + // Delete everything copy-related, default everything move-related + ExecutorTask(const ExecutorTask& other) = delete; + + ExecutorTask& operator=(const ExecutorTask& other) = delete; + + ExecutorTask(ExecutorTask&& other) = default; + + ExecutorTask& operator=(ExecutorTask&& other) = default; + + std::shared_ptr req; + int messageIndex = 0; +}; +} diff --git a/include/faabric/runner/FaabricMain.h b/include/faabric/runner/FaabricMain.h index 6a5fa6f72..b34e3893c 100644 --- a/include/faabric/runner/FaabricMain.h +++ b/include/faabric/runner/FaabricMain.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include #include @@ -12,7 +12,7 @@ namespace faabric::runner { class FaabricMain { public: - FaabricMain(std::shared_ptr fac); + FaabricMain(std::shared_ptr fac); void startBackground(); diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index ddccb4e0d..0a0092c5b 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -19,137 +20,6 @@ class Scheduler; Scheduler& getScheduler(); -class ExecutorTask -{ - public: - ExecutorTask() = default; - - ExecutorTask(int messageIndexIn, - std::shared_ptr reqIn); - - // Delete everything copy-related, default everything move-related - ExecutorTask(const ExecutorTask& other) = delete; - - ExecutorTask& operator=(const ExecutorTask& other) = delete; - - ExecutorTask(ExecutorTask&& other) = default; - - ExecutorTask& operator=(ExecutorTask&& other) = default; - - std::shared_ptr req; - int messageIndex = 0; -}; - -class ChainedCallException : public faabric::util::FaabricException -{ - public: - explicit ChainedCallException(std::string message) - : FaabricException(std::move(message)) - {} -}; - -class Executor -{ - public: - std::string id; - - explicit Executor(faabric::Message& msg); - - // Must be marked virtual to permit proper calling of subclass destructors - virtual ~Executor(); - - std::vector> executeThreads( - std::shared_ptr req, - const std::vector& mergeRegions); - - void executeTasks(std::vector msgIdxs, - std::shared_ptr req); - - virtual void shutdown(); - - virtual void reset(faabric::Message& msg); - - virtual int32_t executeTask( - int threadPoolIdx, - int msgIdx, - std::shared_ptr req); - - bool tryClaim(); - - void claim(); - - void releaseClaim(); - - std::shared_ptr getMainThreadSnapshot( - faabric::Message& msg, - bool createIfNotExists = false); - - long getMillisSinceLastExec(); - - virtual std::span getMemoryView(); - - virtual void restore(const std::string& snapshotKey); - - faabric::Message& getBoundMessage(); - - bool isExecuting(); - - bool isShutdown() { return _isShutdown; } - - void addChainedMessage(const faabric::Message& msg); - - const faabric::Message& getChainedMessage(int messageId); - - std::set getChainedMessageIds(); - - // This method merges all the thread-local dirty regions and returns a - // set of diffs. It must be called once per executor, once all other - // threads in the local batch have finished executing - std::vector mergeDirtyRegions( - const Message& msg, - const std::vector& extraDirtyPages = {}); - - virtual void setMemorySize(size_t newSize); - - protected: - virtual size_t getMaxMemorySize(); - - faabric::Message boundMessage; - - Scheduler& sch; - - faabric::snapshot::SnapshotRegistry& reg; - - std::shared_ptr tracker; - - uint32_t threadPoolSize = 0; - - std::map> chainedMessages; - - private: - // ---- Accounting ---- - std::atomic claimed = false; - std::atomic _isShutdown = false; - std::atomic batchCounter = 0; - std::atomic threadBatchCounter = 0; - faabric::util::TimePoint lastExec; - - // ---- Application threads ---- - std::shared_mutex threadExecutionMutex; - std::vector dirtyRegions; - std::vector> threadLocalDirtyRegions; - void deleteMainThreadSnapshot(const faabric::Message& msg); - - // ---- Function execution thread pool ---- - std::mutex threadsMutex; - std::vector> threadPoolThreads; - std::set availablePoolThreads; - - std::vector> threadTaskQueues; - - void threadPoolThread(std::stop_token st, int threadPoolIdx); -}; - /** * Background thread that periodically checks to see if any executors have * become stale (i.e. not handled any requests in a given timeout). If any are @@ -185,19 +55,10 @@ class Scheduler long getFunctionExecutorCount(const faabric::Message& msg); - void flushLocally(); - // ---------------------------------- // Message results // ---------------------------------- - void setFunctionResult(faabric::Message& msg); - - void setThreadResult(faabric::Message& msg, - int32_t returnValue, - const std::string& key, - const std::vector& diffs); - /** * Caches a message along with the thread result, to allow the thread result * to refer to data held in that message (i.e. snapshot diffs). The message @@ -250,7 +111,9 @@ class Scheduler std::atomic _isShutdown = false; // ---- Executors ---- - std::unordered_map>> + std::unordered_map< + std::string, + std::vector>> executors; // ---- Threads ---- @@ -265,7 +128,7 @@ class Scheduler // ---- Actual scheduling ---- SchedulerReaperThread reaperThread; - std::shared_ptr claimExecutor( + std::shared_ptr claimExecutor( faabric::Message& msg, faabric::util::FullLock& schedulerLock); diff --git a/src/executor/CMakeLists.txt b/src/executor/CMakeLists.txt new file mode 100644 index 000000000..ec2619186 --- /dev/null +++ b/src/executor/CMakeLists.txt @@ -0,0 +1,14 @@ +faabric_lib(executor + Executor.cpp + ExecutorContext.cpp + ExecutorFactory.cpp + ExecutorTask.cpp +) + +# FIXME: do we need all these deps here? +target_link_libraries(executor PRIVATE + faabric::scheduling_util + faabric::snapshot + faabric::state + faabric::redis +) diff --git a/src/scheduler/Executor.cpp b/src/executor/Executor.cpp similarity index 85% rename from src/scheduler/Executor.cpp rename to src/executor/Executor.cpp index f01e14f34..aa1447c91 100644 --- a/src/scheduler/Executor.cpp +++ b/src/executor/Executor.cpp @@ -1,8 +1,11 @@ #include +#include +#include +#include #include +#include #include -#include -#include +#include #include #include #include @@ -29,18 +32,11 @@ #define POOL_SHUTDOWN -1 -namespace faabric::scheduler { - -ExecutorTask::ExecutorTask(int messageIndexIn, - std::shared_ptr reqIn) - : req(std::move(reqIn)) - , messageIndex(messageIndexIn) -{} +namespace faabric::executor { // TODO - avoid the copy of the message here? Executor::Executor(faabric::Message& msg) : boundMessage(msg) - , sch(getScheduler()) , reg(faabric::snapshot::getSnapshotRegistry()) , tracker(faabric::util::getDirtyTracker()) , threadPoolSize(faabric::util::getUsableCores()) @@ -110,90 +106,8 @@ Executor::~Executor() } } -// TODO(rm-executeThreads): get rid of this method here -std::vector> Executor::executeThreads( - std::shared_ptr req, - const std::vector& mergeRegions) -{ - SPDLOG_DEBUG("Executor {} executing {} threads", id, req->messages_size()); - - std::string funcStr = faabric::util::funcToString(req); - bool isSingleHost = req->singlehost(); - - // Do snapshotting if not on a single host - faabric::Message& msg = req->mutable_messages()->at(0); - std::shared_ptr snap = nullptr; - if (!isSingleHost) { - snap = getMainThreadSnapshot(msg, true); - - // Get dirty regions since last batch of threads - std::span memView = getMemoryView(); - tracker->stopTracking(memView); - tracker->stopThreadLocalTracking(memView); - - // If this is the first batch, these dirty regions will be empty - std::vector dirtyRegions = tracker->getBothDirtyPages(memView); - - // Apply changes to snapshot - snap->fillGapsWithBytewiseRegions(); - std::vector updates = - snap->diffWithDirtyRegions(memView, dirtyRegions); - - if (updates.empty()) { - SPDLOG_DEBUG( - "No updates to main thread snapshot for {} over {} pages", - faabric::util::funcToString(msg, false), - dirtyRegions.size()); - } else { - SPDLOG_DEBUG("Updating main thread snapshot for {} with {} diffs", - faabric::util::funcToString(msg, false), - updates.size()); - snap->applyDiffs(updates); - } - - // Clear merge regions, not persisted between batches of threads - snap->clearMergeRegions(); - - // Now we have to add any merge regions we've been saving up for this - // next batch of threads - for (const auto& mr : mergeRegions) { - snap->addMergeRegion( - mr.offset, mr.length, mr.dataType, mr.operation); - } - } - - // Invoke threads and await - // TODO: for the time being, threads may execute for a long time so we - // are a bit more generous with the timeout - auto decision = faabric::planner::getPlannerClient().callFunctions(req); - std::vector> results = sch.awaitThreadResults( - req, 10 * faabric::util::getSystemConfig().boundTimeout); - - // Perform snapshot updates if not on single host - if (!isSingleHost) { - // Add the diffs corresponding to this executor - auto diffs = mergeDirtyRegions(msg); - snap->queueDiffs(diffs); - - // Write queued changes to snapshot - int nWritten = snap->writeQueuedDiffs(); - - // Remap memory to snapshot if it's been updated - std::span memView = getMemoryView(); - if (nWritten > 0) { - setMemorySize(snap->getSize()); - snap->mapToMemory(memView); - } - - // Start tracking again - memView = getMemoryView(); - tracker->startTracking(memView); - tracker->startThreadLocalTracking(memView); - } - - return results; -} - +// TODO(thread-opt): get rid of this method here and move to +// PlannerClient::callFunctions() void Executor::executeTasks(std::vector msgIdxs, std::shared_ptr req) { @@ -353,6 +267,42 @@ void Executor::deleteMainThreadSnapshot(const faabric::Message& msg) } */ +void Executor::setThreadResult( + faabric::Message& msg, + int32_t returnValue, + const std::string& key, + const std::vector& diffs) +{ + bool isMaster = + msg.mainhost() == faabric::util::getSystemConfig().endpointHost; + if (isMaster) { + if (!diffs.empty()) { + // On main we queue the diffs locally directly, on a remote + // host we push them back to main + SPDLOG_DEBUG("Queueing {} diffs for {} to snapshot {} (group {})", + diffs.size(), + faabric::util::funcToString(msg, false), + key, + msg.groupid()); + + auto snap = reg.getSnapshot(key); + + // Here we don't have ownership over all of the snapshot diff data, + // but that's ok as the executor memory will outlast the snapshot + // merging operation. + snap->queueDiffs(diffs); + } + } else { + // Push thread result and diffs together + faabric::snapshot::getSnapshotClient(msg.mainhost()) + ->pushThreadResult(msg.appid(), msg.id(), returnValue, key, diffs); + } + + // Finally, set the message result in the planner + faabric::planner::getPlannerClient().setMessageResult( + std::make_shared(msg)); +} + void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) { SPDLOG_DEBUG("Thread pool thread {}:{} starting up", id, threadPoolIdx); @@ -517,8 +467,7 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) // If this is not a threads request and last in its batch, it may be // the main function (thread) in a threaded application, in which case // we want to stop any tracking and delete the main thread snapshot - // TODO(rm-executeThreads): this should disappear when pthreads do - // not call executeThreads anymore + /* FIXME: remove me if (!isThreads && isLastThreadInExecutor) { // Stop tracking memory std::span memView = getMemoryView(); @@ -532,6 +481,7 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) // deleteMainThreadSnapshot(msg); } } + */ // If this batch is finished, reset the executor and release its // claim. Note that we have to release the claim _after_ resetting, @@ -564,13 +514,14 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) // Set non-final thread result if (isLastThreadInBatch) { // Include diffs if this is the last one - sch.setThreadResult(msg, returnValue, mainThreadSnapKey, diffs); + setThreadResult(msg, returnValue, mainThreadSnapKey, diffs); } else { - sch.setThreadResult(msg, returnValue, "", {}); + setThreadResult(msg, returnValue, "", {}); } } else { // Set normal function result - sch.setFunctionResult(msg); + faabric::planner::getPlannerClient().setMessageResult( + std::make_shared(msg)); } } } diff --git a/src/scheduler/ExecutorContext.cpp b/src/executor/ExecutorContext.cpp similarity index 91% rename from src/scheduler/ExecutorContext.cpp rename to src/executor/ExecutorContext.cpp index eaee7a1b8..1c44453aa 100644 --- a/src/scheduler/ExecutorContext.cpp +++ b/src/executor/ExecutorContext.cpp @@ -1,6 +1,6 @@ -#include +#include -namespace faabric::scheduler { +namespace faabric::executor { static thread_local std::shared_ptr context = nullptr; diff --git a/src/scheduler/ExecutorFactory.cpp b/src/executor/ExecutorFactory.cpp similarity index 84% rename from src/scheduler/ExecutorFactory.cpp rename to src/executor/ExecutorFactory.cpp index 01d9a2490..e778f71f6 100644 --- a/src/scheduler/ExecutorFactory.cpp +++ b/src/executor/ExecutorFactory.cpp @@ -1,7 +1,7 @@ -#include +#include #include -namespace faabric::scheduler { +namespace faabric::executor { static std::shared_ptr _factory; diff --git a/src/executor/ExecutorTask.cpp b/src/executor/ExecutorTask.cpp new file mode 100644 index 000000000..c279f75d9 --- /dev/null +++ b/src/executor/ExecutorTask.cpp @@ -0,0 +1,10 @@ +#include + +namespace faabric::executor { + +ExecutorTask::ExecutorTask(int messageIndexIn, + std::shared_ptr reqIn) + : req(std::move(reqIn)) + , messageIndex(messageIndexIn) +{} +} diff --git a/src/planner/PlannerClient.cpp b/src/planner/PlannerClient.cpp index 4300dc3f8..00d2c39dd 100644 --- a/src/planner/PlannerClient.cpp +++ b/src/planner/PlannerClient.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -129,6 +130,11 @@ void PlannerClient::removeHost(std::shared_ptr req) void PlannerClient::setMessageResult(std::shared_ptr msg) { + // Set finish timestamp + msg->set_finishtimestamp(faabric::util::getGlobalClock().epochMillis()); + + // Let the planner know this function has finished execution. This will + // wake any thread waiting on this result asyncSend(PlannerCalls::SetMessageResult, msg.get()); } diff --git a/src/runner/FaabricMain.cpp b/src/runner/FaabricMain.cpp index 0a900364c..f0a762f92 100644 --- a/src/runner/FaabricMain.cpp +++ b/src/runner/FaabricMain.cpp @@ -1,6 +1,6 @@ +#include #include #include -#include #include #include #include @@ -9,10 +9,10 @@ namespace faabric::runner { FaabricMain::FaabricMain( - std::shared_ptr execFactory) + std::shared_ptr execFactory) : stateServer(faabric::state::getGlobalState()) { - faabric::scheduler::setExecutorFactory(execFactory); + faabric::executor::setExecutorFactory(execFactory); } void FaabricMain::startBackground() diff --git a/src/scheduler/CMakeLists.txt b/src/scheduler/CMakeLists.txt index 468638b15..c0717cf4a 100644 --- a/src/scheduler/CMakeLists.txt +++ b/src/scheduler/CMakeLists.txt @@ -1,12 +1,10 @@ faabric_lib(scheduler - ExecutorContext.cpp - ExecutorFactory.cpp - Executor.cpp FunctionCallClient.cpp FunctionCallServer.cpp Scheduler.cpp ) +# FIXME: do we need all these deps here? target_link_libraries(scheduler PRIVATE faabric::scheduling_util faabric::snapshot diff --git a/src/scheduler/FunctionCallServer.cpp b/src/scheduler/FunctionCallServer.cpp index 70b56890a..733738494 100644 --- a/src/scheduler/FunctionCallServer.cpp +++ b/src/scheduler/FunctionCallServer.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -54,11 +55,17 @@ std::unique_ptr FunctionCallServer::doSyncRecv( std::unique_ptr FunctionCallServer::recvFlush( std::span buffer) { + SPDLOG_INFO("Flushing host {}", + faabric::util::getSystemConfig().endpointHost); + // Clear out any cached state faabric::state::getGlobalState().forceClearAll(false); // Clear the scheduler - scheduler.flushLocally(); + faabric::scheduler::getScheduler().reset(); + + // Clear the executor factory + faabric::executor::getExecutorFactory()->flushHost(); return std::make_unique(); } diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index e070d15d0..70d777b19 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -1,8 +1,8 @@ #include +#include #include #include #include -#include #include #include #include @@ -179,8 +179,9 @@ int Scheduler::reapStaleExecutors() int nReaped = 0; for (auto& execPair : executors) { std::string key = execPair.first; - std::vector>& execs = execPair.second; - std::vector> toRemove; + std::vector>& execs = + execPair.second; + std::vector> toRemove; if (execs.empty()) { continue; @@ -268,10 +269,10 @@ void Scheduler::executeBatch(std::shared_ptr req) if (isThreads) { // Threads use the existing executor. We assume there's only // one running at a time. - std::vector>& thisExecutors = - executors[funcStr]; + std::vector>& + thisExecutors = executors[funcStr]; - std::shared_ptr e = nullptr; + std::shared_ptr e = nullptr; if (thisExecutors.empty()) { // Create executor if not exists e = claimExecutor(*req->mutable_messages(0), lock); @@ -297,7 +298,8 @@ void Scheduler::executeBatch(std::shared_ptr req) for (int i = 0; i < nMessages; i++) { faabric::Message& localMsg = req->mutable_messages()->at(i); - std::shared_ptr e = claimExecutor(localMsg, lock); + std::shared_ptr e = + claimExecutor(localMsg, lock); e->executeTasks({ i }, req); } } @@ -315,18 +317,18 @@ std::vector Scheduler::getRecordedMessages() return recordedMessages; } -std::shared_ptr Scheduler::claimExecutor( +std::shared_ptr Scheduler::claimExecutor( faabric::Message& msg, faabric::util::FullLock& schedulerLock) { std::string funcStr = faabric::util::funcToString(msg, false); - std::vector>& thisExecutors = executors[funcStr]; + std::vector>& thisExecutors = + executors[funcStr]; - std::shared_ptr factory = - getExecutorFactory(); + auto factory = faabric::executor::getExecutorFactory(); - std::shared_ptr claimed = nullptr; + std::shared_ptr claimed = nullptr; for (auto& e : thisExecutors) { if (e->tryClaim()) { claimed = e; @@ -365,63 +367,6 @@ std::string Scheduler::getThisHost() return thisHost; } -void Scheduler::flushLocally() -{ - SPDLOG_INFO("Flushing host {}", - faabric::util::getSystemConfig().endpointHost); - - // Reset this scheduler - reset(); - - // Flush the host - getExecutorFactory()->flushHost(); -} - -void Scheduler::setFunctionResult(faabric::Message& msg) -{ - // Set finish timestamp - msg.set_finishtimestamp(faabric::util::getGlobalClock().epochMillis()); - - // Let the planner know this function has finished execution. This will - // wake any thread waiting on this result - faabric::planner::getPlannerClient().setMessageResult( - std::make_shared(msg)); -} - -void Scheduler::setThreadResult( - faabric::Message& msg, - int32_t returnValue, - const std::string& key, - const std::vector& diffs) -{ - bool isMaster = msg.mainhost() == conf.endpointHost; - if (isMaster) { - if (!diffs.empty()) { - // On main we queue the diffs locally directly, on a remote - // host we push them back to main - SPDLOG_DEBUG("Queueing {} diffs for {} to snapshot {} (group {})", - diffs.size(), - faabric::util::funcToString(msg, false), - key, - msg.groupid()); - - auto snap = reg.getSnapshot(key); - - // Here we don't have ownership over all of the snapshot diff data, - // but that's ok as the executor memory will outlast the snapshot - // merging operation. - snap->queueDiffs(diffs); - } - } else { - // Push thread result and diffs together - getSnapshotClient(msg.mainhost()) - ->pushThreadResult(msg.appid(), msg.id(), returnValue, key, diffs); - } - - // Finally, set the message result in the planner - setFunctionResult(msg); -} - void Scheduler::setThreadResultLocally(uint32_t appId, uint32_t msgId, int32_t returnValue, diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/executor/test_executor.cpp similarity index 97% rename from tests/test/scheduler/test_executor.cpp rename to tests/test/executor/test_executor.cpp index 06853251a..5ffcc03db 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/executor/test_executor.cpp @@ -2,12 +2,10 @@ #include "fixtures.h" -#include - +#include +#include #include #include -#include -#include #include #include #include @@ -19,7 +17,9 @@ #include #include -using namespace faabric::scheduler; +#include + +using namespace faabric::executor; using namespace faabric::util; namespace tests { @@ -79,6 +79,7 @@ int32_t TestExecutor::executeTask( { faabric::Message& msg = reqOrig->mutable_messages()->at(msgIdx); std::string funcStr = faabric::util::funcToString(msg, true); + auto& sch = faabric::scheduler::getScheduler(); bool isThread = reqOrig->type() == faabric::BatchExecuteRequest::THREADS; @@ -239,8 +240,7 @@ int32_t TestExecutor::executeTask( } if (msg.function() == "context-check") { - std::shared_ptr ctx = - faabric::scheduler::ExecutorContext::get(); + std::shared_ptr ctx = ExecutorContext::get(); if (ctx == nullptr) { SPDLOG_ERROR("Executor context is null"); return 999; @@ -398,7 +398,7 @@ TEST_CASE_METHOD(TestExecutorFixture, REQUIRE(result.outputdata() == expected); // Flush - sch.flushLocally(); + getExecutorFactory()->flushHost(); } } @@ -478,7 +478,7 @@ TEST_CASE_METHOD(TestExecutorFixture, } // Check sent to other host if necessary - auto batchRequests = getBatchRequests(); + auto batchRequests = faabric::scheduler::getBatchRequests(); // Check the hosts match up REQUIRE(restoreCount == expectedRestoreCount); @@ -742,12 +742,9 @@ TEST_CASE_METHOD(TestExecutorFixture, faabric::Message msgA = faabric::util::messageFactory("foo", "bar"); faabric::Message msgB = faabric::util::messageFactory("foo", "bar"); - std::shared_ptr fac = - faabric::scheduler::getExecutorFactory(); - std::shared_ptr execA = - fac->createExecutor(msgA); - std::shared_ptr execB = - fac->createExecutor(msgB); + std::shared_ptr fac = getExecutorFactory(); + std::shared_ptr execA = fac->createExecutor(msgA); + std::shared_ptr execB = fac->createExecutor(msgB); // Claim one REQUIRE(execA->tryClaim()); @@ -783,10 +780,8 @@ TEST_CASE_METHOD(TestExecutorFixture, localHost.set_usedslots(5); sch.setThisHostResources(localHost); - std::shared_ptr fac = - faabric::scheduler::getExecutorFactory(); - std::shared_ptr exec = - fac->createExecutor(msg); + std::shared_ptr fac = getExecutorFactory(); + std::shared_ptr exec = fac->createExecutor(msg); long millisA = exec->getMillisSinceLastExec(); @@ -1189,9 +1184,8 @@ TEST_CASE_METHOD(TestExecutorFixture, "Test executor restore", "[executor]") snap->copyInData(dataB, offsetB); // Create an executor - std::shared_ptr fac = - faabric::scheduler::getExecutorFactory(); - std::shared_ptr exec = fac->createExecutor(m); + std::shared_ptr fac = getExecutorFactory(); + std::shared_ptr exec = fac->createExecutor(m); // Restore from snapshot exec->restore(snapKey); @@ -1226,9 +1220,8 @@ TEST_CASE_METHOD(TestExecutorFixture, std::string snapKey = faabric::util::getMainThreadSnapshotKey(m); // Create an executor - std::shared_ptr fac = - faabric::scheduler::getExecutorFactory(); - std::shared_ptr exec = fac->createExecutor(m); + std::shared_ptr fac = getExecutorFactory(); + std::shared_ptr exec = fac->createExecutor(m); // Get a pointer to the TestExecutor so we can override the max memory auto testExec = std::static_pointer_cast(exec); @@ -1290,7 +1283,7 @@ TEST_CASE_METHOD(TestExecutorFixture, auto& firstMsg = *req->mutable_messages(0); // Create an executor - auto fac = faabric::scheduler::getExecutorFactory(); + auto fac = getExecutorFactory(); auto exec = fac->createExecutor(firstMsg); // At the begining there are no chained messages @@ -1335,6 +1328,9 @@ TEST_CASE_METHOD(TestExecutorFixture, auto exec = std::make_shared(*req->mutable_messages(0)); // Execute directly calling the executor + SPDLOG_ERROR("pls fixme"); + throw std::runtime_error("Fix this test!"); + /* auto results = exec->executeThreads(req, {}); // Check results @@ -1345,5 +1341,6 @@ TEST_CASE_METHOD(TestExecutorFixture, // Shut down executor exec->shutdown(); + */ } } diff --git a/tests/test/scheduler/test_executor_context.cpp b/tests/test/executor/test_executor_context.cpp similarity index 95% rename from tests/test/scheduler/test_executor_context.cpp rename to tests/test/executor/test_executor_context.cpp index 72e192df7..58e2ea912 100644 --- a/tests/test/scheduler/test_executor_context.cpp +++ b/tests/test/executor/test_executor_context.cpp @@ -3,10 +3,10 @@ #include "faabric_utils.h" #include "fixtures.h" -#include +#include #include -using namespace faabric::scheduler; +using namespace faabric::executor; namespace tests { diff --git a/tests/test/scheduler/test_executor_reaping.cpp b/tests/test/executor/test_executor_reaping.cpp similarity index 100% rename from tests/test/scheduler/test_executor_reaping.cpp rename to tests/test/executor/test_executor_reaping.cpp diff --git a/tests/utils/DummyExecutor.h b/tests/utils/DummyExecutor.h index c5a43c9fd..b544507c0 100644 --- a/tests/utils/DummyExecutor.h +++ b/tests/utils/DummyExecutor.h @@ -1,8 +1,8 @@ #pragma once -#include +#include -namespace faabric::scheduler { +namespace faabric::executor { class DummyExecutor final : public Executor { diff --git a/tests/utils/DummyExecutorFactory.cpp b/tests/utils/DummyExecutorFactory.cpp index 3d49b513c..e3c8059e6 100644 --- a/tests/utils/DummyExecutorFactory.cpp +++ b/tests/utils/DummyExecutorFactory.cpp @@ -3,7 +3,7 @@ #include -namespace faabric::scheduler { +namespace faabric::executor { std::shared_ptr DummyExecutorFactory::createExecutor( faabric::Message& msg) diff --git a/tests/utils/DummyExecutorFactory.h b/tests/utils/DummyExecutorFactory.h index 10d95086c..c901da0b3 100644 --- a/tests/utils/DummyExecutorFactory.h +++ b/tests/utils/DummyExecutorFactory.h @@ -1,8 +1,8 @@ #pragma once -#include +#include -namespace faabric::scheduler { +namespace faabric::executor { class DummyExecutorFactory : public ExecutorFactory { diff --git a/tests/utils/fixtures.h b/tests/utils/fixtures.h index 9c1f9b0d8..1af23f948 100644 --- a/tests/utils/fixtures.h +++ b/tests/utils/fixtures.h @@ -9,6 +9,8 @@ #include #include #include +#include +#include #include #include #include @@ -16,8 +18,6 @@ #include #include #include -#include -#include #include #include #include @@ -271,7 +271,7 @@ class ExecutorContextFixture public: ExecutorContextFixture() {} - ~ExecutorContextFixture() { faabric::scheduler::ExecutorContext::unset(); } + ~ExecutorContextFixture() { faabric::executor::ExecutorContext::unset(); } /** * Creates a batch request and sets up the associated context @@ -293,13 +293,13 @@ class ExecutorContextFixture */ void setUpContext(std::shared_ptr req) { - faabric::scheduler::ExecutorContext::set(nullptr, req, 0); + faabric::executor::ExecutorContext::set(nullptr, req, 0); } }; #define TEST_EXECUTOR_DEFAULT_MEMORY_SIZE (10 * faabric::util::HOST_PAGE_SIZE) -class TestExecutor final : public faabric::scheduler::Executor +class TestExecutor final : public faabric::executor::Executor { public: TestExecutor(faabric::Message& msg); @@ -324,10 +324,10 @@ class TestExecutor final : public faabric::scheduler::Executor std::shared_ptr reqOrig) override; }; -class TestExecutorFactory : public faabric::scheduler::ExecutorFactory +class TestExecutorFactory : public faabric::executor::ExecutorFactory { protected: - std::shared_ptr createExecutor( + std::shared_ptr createExecutor( faabric::Message& msg) override; }; @@ -399,9 +399,9 @@ class MpiBaseTestFixture , req(faabric::util::batchExecFactory(user, func, 1)) , msg(*req->mutable_messages(0)) { - std::shared_ptr fac = - std::make_shared(); - faabric::scheduler::setExecutorFactory(fac); + std::shared_ptr fac = + std::make_shared(); + faabric::executor::setExecutorFactory(fac); msg.set_mpiworldid(worldId); msg.set_mpiworldsize(worldSize); diff --git a/tests/utils/system_utils.cpp b/tests/utils/system_utils.cpp deleted file mode 100644 index 665fdf5d1..000000000 --- a/tests/utils/system_utils.cpp +++ /dev/null @@ -1,65 +0,0 @@ -#include "DummyExecutorFactory.h" -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace tests { -void cleanFaabric() -{ - faabric::util::SystemConfig& conf = faabric::util::getSystemConfig(); - - // Clear out Redis - redis::Redis::getState().flushAll(); - redis::Redis::getQueue().flushAll(); - - // Clear out any cached state, do so for both modes - std::string& originalStateMode = conf.stateMode; - conf.stateMode = "inmemory"; - state::getGlobalState().forceClearAll(true); - conf.stateMode = "redis"; - state::getGlobalState().forceClearAll(true); - conf.stateMode = originalStateMode; - - // Reset scheduler - scheduler::Scheduler& sch = scheduler::getScheduler(); - sch.shutdown(); - sch.addHostToGlobalSet(); - - // Give scheduler enough resources - faabric::HostResources res; - res.set_slots(10); - sch.setThisHostResources(res); - - // Clear snapshots - faabric::snapshot::getSnapshotRegistry().clear(); - - // Reset system config - conf.reset(); - - // Set test mode back on and mock mode off - faabric::util::setTestMode(true); - faabric::util::setMockMode(false); - faabric::scheduler::clearMockRequests(); - faabric::snapshot::clearMockSnapshotRequests(); - - // Set up dummy executor factory - std::shared_ptr fac = - std::make_shared(); - faabric::scheduler::setExecutorFactory(fac); - - // Clear out MPI worlds - faabric::mpi::MpiWorldRegistry& mpiRegistry = - faabric::mpi::getMpiWorldRegistry(); - mpiRegistry.clear(); -} -}