Skip to content

Commit

Permalink
executor: move executor code to separate top-level directory
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz committed Jan 16, 2024
1 parent 7fba9d8 commit 628e19b
Show file tree
Hide file tree
Showing 25 changed files with 310 additions and 435 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ endfunction()

add_subdirectory(src/batch-scheduler)
add_subdirectory(src/endpoint)
add_subdirectory(src/executor)
add_subdirectory(src/flat)
add_subdirectory(src/mpi)
add_subdirectory(src/planner)
Expand All @@ -121,6 +122,7 @@ add_library(faabric
faabric.cpp
$<TARGET_OBJECTS:batch_scheduler_obj>
$<TARGET_OBJECTS:endpoint_obj>
$<TARGET_OBJECTS:executor_obj>
$<TARGET_OBJECTS:flat_obj>
$<TARGET_OBJECTS:mpi_obj>
$<TARGET_OBJECTS:planner_obj>
Expand Down
119 changes: 119 additions & 0 deletions include/faabric/executor/Executor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#pragma once

#include <faabric/executor/ExecutorTask.h>
#include <faabric/proto/faabric.pb.h>
#include <faabric/snapshot/SnapshotRegistry.h>
#include <faabric/util/clock.h>
#include <faabric/util/exception.h>
#include <faabric/util/queue.h>
#include <faabric/util/snapshot.h>

namespace faabric::executor {

class ChainedCallException : public faabric::util::FaabricException
{
public:
explicit ChainedCallException(std::string message)
: FaabricException(std::move(message))
{}
};

class Executor
{
public:
std::string id;

explicit Executor(faabric::Message& msg);

// Must be marked virtual to permit proper calling of subclass destructors
virtual ~Executor();

void executeTasks(std::vector<int> msgIdxs,
std::shared_ptr<faabric::BatchExecuteRequest> req);

virtual void shutdown();

virtual void reset(faabric::Message& msg);

virtual int32_t executeTask(
int threadPoolIdx,
int msgIdx,
std::shared_ptr<faabric::BatchExecuteRequest> req);

bool tryClaim();

void claim();

void releaseClaim();

std::shared_ptr<faabric::util::SnapshotData> getMainThreadSnapshot(
faabric::Message& msg,
bool createIfNotExists = false);

long getMillisSinceLastExec();

virtual std::span<uint8_t> getMemoryView();

virtual void restore(const std::string& snapshotKey);

faabric::Message& getBoundMessage();

bool isExecuting();

bool isShutdown() { return _isShutdown; }

void addChainedMessage(const faabric::Message& msg);

const faabric::Message& getChainedMessage(int messageId);

std::set<unsigned int> getChainedMessageIds();

std::vector<faabric::util::SnapshotDiff> mergeDirtyRegions(
const Message& msg,
const std::vector<char>& extraDirtyPages = {});

// FIXME: what is the right visibility?
void setThreadResult(faabric::Message& msg,
int32_t returnValue,
const std::string& key,
const std::vector<faabric::util::SnapshotDiff>& diffs);

virtual void setMemorySize(size_t newSize);

protected:
virtual size_t getMaxMemorySize();

faabric::Message boundMessage;

faabric::snapshot::SnapshotRegistry& reg;

std::shared_ptr<faabric::util::DirtyTracker> tracker;

uint32_t threadPoolSize = 0;

std::map<int, std::shared_ptr<faabric::Message>> chainedMessages;

private:
// ---- Accounting ----
std::atomic<bool> claimed = false;
std::atomic<bool> _isShutdown = false;
std::atomic<int> batchCounter = 0;
std::atomic<int> threadBatchCounter = 0;
faabric::util::TimePoint lastExec;

// ---- Application threads ----
std::shared_mutex threadExecutionMutex;
std::vector<char> dirtyRegions;
std::vector<std::vector<char>> threadLocalDirtyRegions;
void deleteMainThreadSnapshot(const faabric::Message& msg);

// ---- Function execution thread pool ----
std::mutex threadsMutex;
std::vector<std::shared_ptr<std::jthread>> threadPoolThreads;
std::set<int> availablePoolThreads;

std::vector<faabric::util::Queue<ExecutorTask>> threadTaskQueues;

void threadPoolThread(std::stop_token st, int threadPoolIdx);
};
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#pragma once

#include <faabric/executor/Executor.h>
#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/util/exception.h>

namespace faabric::scheduler {
namespace faabric::executor {

class ExecutorContextException : public faabric::util::FaabricException
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#pragma once

#include <faabric/scheduler/Scheduler.h>
#include <faabric/executor/Executor.h>

namespace faabric::scheduler {
namespace faabric::executor {

class ExecutorFactory
{
Expand Down
27 changes: 27 additions & 0 deletions include/faabric/executor/ExecutorTask.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include <faabric/proto/faabric.pb.h>

namespace faabric::executor {

class ExecutorTask
{
public:
ExecutorTask() = default;

ExecutorTask(int messageIndexIn,
std::shared_ptr<BatchExecuteRequest> reqIn);

// Delete everything copy-related, default everything move-related
ExecutorTask(const ExecutorTask& other) = delete;

ExecutorTask& operator=(const ExecutorTask& other) = delete;

ExecutorTask(ExecutorTask&& other) = default;

ExecutorTask& operator=(ExecutorTask&& other) = default;

std::shared_ptr<BatchExecuteRequest> req;
int messageIndex = 0;
};
}
4 changes: 2 additions & 2 deletions include/faabric/runner/FaabricMain.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include <faabric/scheduler/ExecutorFactory.h>
#include <faabric/executor/ExecutorFactory.h>
#include <faabric/scheduler/FunctionCallServer.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/snapshot/SnapshotServer.h>
Expand All @@ -12,7 +12,7 @@ namespace faabric::runner {
class FaabricMain
{
public:
FaabricMain(std::shared_ptr<faabric::scheduler::ExecutorFactory> fac);
FaabricMain(std::shared_ptr<faabric::executor::ExecutorFactory> fac);

void startBackground();

Expand Down
147 changes: 5 additions & 142 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <faabric/executor/Executor.h>
#include <faabric/planner/PlannerClient.h>
#include <faabric/proto/faabric.pb.h>
#include <faabric/snapshot/SnapshotRegistry.h>
Expand All @@ -19,137 +20,6 @@ class Scheduler;

Scheduler& getScheduler();

class ExecutorTask
{
public:
ExecutorTask() = default;

ExecutorTask(int messageIndexIn,
std::shared_ptr<faabric::BatchExecuteRequest> reqIn);

// Delete everything copy-related, default everything move-related
ExecutorTask(const ExecutorTask& other) = delete;

ExecutorTask& operator=(const ExecutorTask& other) = delete;

ExecutorTask(ExecutorTask&& other) = default;

ExecutorTask& operator=(ExecutorTask&& other) = default;

std::shared_ptr<faabric::BatchExecuteRequest> req;
int messageIndex = 0;
};

class ChainedCallException : public faabric::util::FaabricException
{
public:
explicit ChainedCallException(std::string message)
: FaabricException(std::move(message))
{}
};

class Executor
{
public:
std::string id;

explicit Executor(faabric::Message& msg);

// Must be marked virtual to permit proper calling of subclass destructors
virtual ~Executor();

std::vector<std::pair<uint32_t, int32_t>> executeThreads(
std::shared_ptr<faabric::BatchExecuteRequest> req,
const std::vector<faabric::util::SnapshotMergeRegion>& mergeRegions);

void executeTasks(std::vector<int> msgIdxs,
std::shared_ptr<faabric::BatchExecuteRequest> req);

virtual void shutdown();

virtual void reset(faabric::Message& msg);

virtual int32_t executeTask(
int threadPoolIdx,
int msgIdx,
std::shared_ptr<faabric::BatchExecuteRequest> req);

bool tryClaim();

void claim();

void releaseClaim();

std::shared_ptr<faabric::util::SnapshotData> getMainThreadSnapshot(
faabric::Message& msg,
bool createIfNotExists = false);

long getMillisSinceLastExec();

virtual std::span<uint8_t> getMemoryView();

virtual void restore(const std::string& snapshotKey);

faabric::Message& getBoundMessage();

bool isExecuting();

bool isShutdown() { return _isShutdown; }

void addChainedMessage(const faabric::Message& msg);

const faabric::Message& getChainedMessage(int messageId);

std::set<unsigned int> getChainedMessageIds();

// This method merges all the thread-local dirty regions and returns a
// set of diffs. It must be called once per executor, once all other
// threads in the local batch have finished executing
std::vector<faabric::util::SnapshotDiff> mergeDirtyRegions(
const Message& msg,
const std::vector<char>& extraDirtyPages = {});

virtual void setMemorySize(size_t newSize);

protected:
virtual size_t getMaxMemorySize();

faabric::Message boundMessage;

Scheduler& sch;

faabric::snapshot::SnapshotRegistry& reg;

std::shared_ptr<faabric::util::DirtyTracker> tracker;

uint32_t threadPoolSize = 0;

std::map<int, std::shared_ptr<faabric::Message>> chainedMessages;

private:
// ---- Accounting ----
std::atomic<bool> claimed = false;
std::atomic<bool> _isShutdown = false;
std::atomic<int> batchCounter = 0;
std::atomic<int> threadBatchCounter = 0;
faabric::util::TimePoint lastExec;

// ---- Application threads ----
std::shared_mutex threadExecutionMutex;
std::vector<char> dirtyRegions;
std::vector<std::vector<char>> threadLocalDirtyRegions;
void deleteMainThreadSnapshot(const faabric::Message& msg);

// ---- Function execution thread pool ----
std::mutex threadsMutex;
std::vector<std::shared_ptr<std::jthread>> threadPoolThreads;
std::set<int> availablePoolThreads;

std::vector<faabric::util::Queue<ExecutorTask>> threadTaskQueues;

void threadPoolThread(std::stop_token st, int threadPoolIdx);
};

/**
* Background thread that periodically checks to see if any executors have
* become stale (i.e. not handled any requests in a given timeout). If any are
Expand Down Expand Up @@ -185,19 +55,10 @@ class Scheduler

long getFunctionExecutorCount(const faabric::Message& msg);

void flushLocally();

// ----------------------------------
// Message results
// ----------------------------------

void setFunctionResult(faabric::Message& msg);

void setThreadResult(faabric::Message& msg,
int32_t returnValue,
const std::string& key,
const std::vector<faabric::util::SnapshotDiff>& diffs);

/**
* Caches a message along with the thread result, to allow the thread result
* to refer to data held in that message (i.e. snapshot diffs). The message
Expand Down Expand Up @@ -250,7 +111,9 @@ class Scheduler
std::atomic<bool> _isShutdown = false;

// ---- Executors ----
std::unordered_map<std::string, std::vector<std::shared_ptr<Executor>>>
std::unordered_map<
std::string,
std::vector<std::shared_ptr<faabric::executor::Executor>>>
executors;

// ---- Threads ----
Expand All @@ -265,7 +128,7 @@ class Scheduler
// ---- Actual scheduling ----
SchedulerReaperThread reaperThread;

std::shared_ptr<Executor> claimExecutor(
std::shared_ptr<faabric::executor::Executor> claimExecutor(
faabric::Message& msg,
faabric::util::FullLock& schedulerLock);

Expand Down
Loading

0 comments on commit 628e19b

Please sign in to comment.