Skip to content

Commit

Permalink
mpi(hwloc): spin-lock receiver threads and pin to a core
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz committed Apr 8, 2024
1 parent d94f61a commit 7ad4144
Show file tree
Hide file tree
Showing 25 changed files with 467 additions and 86 deletions.
8 changes: 5 additions & 3 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ jobs:
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.16.0
env:
DEPLOYMENT_TYPE: gha-ci
steps:
- name: "Check-out code"
uses: actions/checkout@v4
Expand Down Expand Up @@ -59,7 +61,7 @@ jobs:
needs: [conan-cache]
runs-on: ubuntu-latest
env:
HOST_TYPE: ci
DEPLOYMENT_TYPE: gha-ci
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
Expand Down Expand Up @@ -98,7 +100,7 @@ jobs:
matrix:
sanitiser: [None, Address, Thread, Undefined]
env:
HOST_TYPE: ci
DEPLOYMENT_TYPE: gha-ci
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
Expand Down Expand Up @@ -145,7 +147,7 @@ jobs:
needs: [conan-cache]
runs-on: ubuntu-latest
env:
HOST_TYPE: ci
DEPLOYMENT_TYPE: gha-ci
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
Expand Down
14 changes: 12 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ option(FAABRIC_BUILD_TESTS "Build Faabric tests" ON)
option(FAABRIC_SELF_TRACING "Turn on system tracing using the logger" OFF)
option(FAABRIC_CODE_COVERAGE "Build Faabric with code coverage profiling" OFF)
option(FAABRIC_TARGET_CPU "CPU to optimise for, e.g. skylake, icelake or native" OFF)
option(FAABRIC_USE_SPINLOCK "Use spinlocks for low-latency messaging" ON)

# Enable colorized compiler output
if("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU")
Expand All @@ -16,10 +17,10 @@ endif()

# Optimise for CPU
if(FAABRIC_TARGET_CPU)
message(STATUS "Optimising Faabric for CPU ${FAABRIC_TARGET_CPU}")
message(STATUS "Faabric: optimising for CPU ${FAABRIC_TARGET_CPU}")
add_compile_options(-march=${FAABRIC_TARGET_CPU} -mtune=${FAABRIC_TARGET_CPU})
else()
message(STATUS "Faabric not optimised for specific CPU")
message(STATUS "Faabric: not optimised for specific CPU")
endif()

# Top-level CMake config
Expand All @@ -38,6 +39,15 @@ if(${FAABRIC_SELF_TRACING})
add_definitions(-DTRACE_ALL=1)
endif()

# We want to disable the usage of spinlocks (and CPU pinning) in GHA runners
# as they have a very low number of vCPU cores
if (${FAABRIC_USE_SPINLOCK} AND NOT "$ENV{DEPLOYMENT_TYPE}" STREQUAL "gha-ci")
message(STATUS "Faabric: enabled spin-locks")
add_definitions(-DFAABRIC_USE_SPINLOCK)
else()
message(STATUS "Faabric: disabled spin-locks")
endif()

# Set-up use of sanitisers
if (FAABRIC_USE_SANITISER STREQUAL "Address")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address")
Expand Down
45 changes: 27 additions & 18 deletions cmake/ExternalProjects.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ if(NOT EXISTS "${CMAKE_CURRENT_BINARY_DIR}/conan.cmake")
TLS_VERIFY ON)
endif()

set(CONAN_CMAKE_SILENT_OUTPUT ON CACHE INTERNAL "")
include(${CMAKE_CURRENT_BINARY_DIR}/conan.cmake)

conan_check(VERSION 1.63.0 REQUIRED)
Expand Down Expand Up @@ -73,19 +74,19 @@ conan_cmake_install(PATH_OR_REFERENCE .

include(${CMAKE_CURRENT_BINARY_DIR}/conan_paths.cmake)

find_package(absl REQUIRED)
find_package(Boost 1.80.0 REQUIRED)
find_package(Catch2 REQUIRED)
find_package(flatbuffers REQUIRED)
find_package(fmt REQUIRED)
find_package(hiredis REQUIRED)
find_package(absl QUIET REQUIRED)
find_package(Boost 1.80.0 QUIET REQUIRED)
find_package(Catch2 QUIET REQUIRED)
find_package(flatbuffers QUIET REQUIRED)
find_package(fmt QUIET REQUIRED)
find_package(hiredis QUIET REQUIRED)
# 27/01/2023 - Pin OpenSSL to a specific version to avoid incompatibilities
# with the system's (i.e. Ubuntu 22.04) OpenSSL
find_package(OpenSSL 3.0.2 REQUIRED)
find_package(Protobuf 3.20.0 REQUIRED)
find_package(readerwriterqueue REQUIRED)
find_package(spdlog REQUIRED)
find_package(ZLIB REQUIRED)
find_package(OpenSSL 3.0.2 QUIET REQUIRED)
find_package(Protobuf 3.20.0 QUIET REQUIRED)
find_package(readerwriterqueue QUIET REQUIRED)
find_package(spdlog QUIET REQUIRED)
find_package(ZLIB QUIET REQUIRED)

# --------------------------------
# Fetch content dependencies
Expand All @@ -109,16 +110,26 @@ set(ZSTD_LZ4_SUPPORT OFF CACHE INTERNAL "")
# nng (Conan version out of date)
set(NNG_TESTS OFF CACHE INTERNAL "")

FetchContent_Declare(zstd_ext
GIT_REPOSITORY "https://github.com/facebook/zstd"
GIT_TAG "v1.5.2"
SOURCE_SUBDIR "build/cmake"
FetchContent_Declare(atomic_queue_ext
GIT_REPOSITORY "https://github.com/max0x7ba/atomic_queue"
GIT_TAG "7c36f0997979a0fee5be84c9511ee0f6032057ec"
)
FetchContent_Declare(nng_ext
GIT_REPOSITORY "https://github.com/nanomsg/nng"
# NNG tagged version 1.7.1
GIT_TAG "ec4b5722fba105e3b944e3dc0f6b63c941748b3f"
)
FetchContent_Declare(zstd_ext
GIT_REPOSITORY "https://github.com/facebook/zstd"
GIT_TAG "v1.5.2"
SOURCE_SUBDIR "build/cmake"
)

FetchContent_MakeAvailable(atomic_queue_ext)
add_library(atomic_queue::atomic_queue ALIAS atomic_queue)

FetchContent_MakeAvailable(nng_ext)
add_library(nng::nng ALIAS nng)

FetchContent_MakeAvailable(zstd_ext)
# Work around zstd not declaring its targets properly
Expand All @@ -127,9 +138,6 @@ target_include_directories(libzstd_shared SYSTEM INTERFACE $<BUILD_INTERFACE:${z
add_library(zstd::libzstd_static ALIAS libzstd_static)
add_library(zstd::libzstd_shared ALIAS libzstd_shared)

FetchContent_MakeAvailable(nng_ext)
add_library(nng::nng ALIAS nng)

# Group all external dependencies into a convenient virtual CMake library
add_library(faabric_common_dependencies INTERFACE)
target_include_directories(faabric_common_dependencies INTERFACE
Expand All @@ -141,6 +149,7 @@ target_link_libraries(faabric_common_dependencies INTERFACE
absl::flat_hash_set
absl::flat_hash_map
absl::strings
atomic_queue::atomic_queue
Boost::Boost
Boost::system
flatbuffers::flatbuffers
Expand Down
1 change: 1 addition & 0 deletions dist-test/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pushd ${PROJ_ROOT} >> /dev/null
# Run the build
docker compose \
run \
-e FAABRIC_DEPLOYMENT_TYPE=gha-ci \
--rm \
cli \
/code/faabric/dist-test/build_internal.sh
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ services:
tty: true
privileged: true
environment:
- DEPLOYMENT_TYPE=${FAABRIC_DEPLOYMENT_TYPE:-compose}
- LOG_LEVEL=${LOG_LEVEL:-debug}
- PLANNER_HOST=planner
- PLANNER_PORT=8080
Expand All @@ -48,6 +49,7 @@ services:
- ./conan-cache/:/root/.conan
working_dir: /build/faabric/static
environment:
- DEPLOYMENT_TYPE=${FAABRIC_DEPLOYMENT_TYPE:-compose}
- LOG_LEVEL=debug
- PLANNER_HOST=planner
- PLANNER_PORT=8080
Expand Down
4 changes: 4 additions & 0 deletions include/faabric/mpi/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ namespace faabric::mpi {
// as the broker already has mocking capabilities
std::vector<MpiMessage> getMpiMockedMessages(int sendRank);

#ifdef FAABRIC_USE_SPINLOCK
typedef faabric::util::SpinLockQueue<MpiMessage> InMemoryMpiQueue;
#else
typedef faabric::util::FixedCapacityQueue<MpiMessage> InMemoryMpiQueue;
#endif

class MpiWorld
{
Expand Down
2 changes: 2 additions & 0 deletions include/faabric/transport/tcp/RecvSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@ class RecvSocket
int port;

std::deque<int> openConnections;

void setSocketOptions(int connFd);
};
}
5 changes: 4 additions & 1 deletion include/faabric/transport/tcp/SocketOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ void setReuseAddr(int connFd);
void setNoDelay(int connFd);
void setQuickAck(int connFd);

// Blocking/Non-blocking sockets
void setNonBlocking(int connFd);
void setBlocking(int connFd);

bool isNonBlocking(int connFd);

// Enable busy polling for non-blocking sockets
void setBusyPolling(int connFd);
}
1 change: 1 addition & 0 deletions include/faabric/util/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class SystemConfig

// Scheduling
int overrideCpuCount;
int overrideFreeCpuStart;
std::string batchSchedulerMode;

// Worker-related timeouts
Expand Down
32 changes: 32 additions & 0 deletions include/faabric/util/hwloc.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once

#include <memory>
#include <pthread.h>

namespace faabric::util {

const int NO_CPU_IDX = -1;
const int GHA_CPU_IDX = -2;

class FaabricCpuSet
{
public:
FaabricCpuSet(int cpuIdxIn = NO_CPU_IDX);
FaabricCpuSet& operator=(const FaabricCpuSet&) = delete;
FaabricCpuSet(const FaabricCpuSet&) = delete;

~FaabricCpuSet();

cpu_set_t* get() { return &cpuSet; }

private:
cpu_set_t cpuSet;

// CPU index in internal CPU accounting
int cpuIdx = NO_CPU_IDX;
};

// Pin thread to any "unpinned" CPUs. Returns the CPU set it was pinned to.
// We return a unique pointer to enforce RAII on the pinned-to CPU
std::unique_ptr<FaabricCpuSet> pinThreadToFreeCpu(pthread_t thread);
}
27 changes: 27 additions & 0 deletions include/faabric/util/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <faabric/util/locks.h>
#include <faabric/util/logging.h>

#include <atomic_queue/atomic_queue.h>
#include <condition_variable>
#include <queue>
#include <readerwriterqueue/readerwritercircularbuffer.h>
Expand Down Expand Up @@ -215,6 +216,32 @@ class FixedCapacityQueue
moodycamel::BlockingReaderWriterCircularBuffer<T> mq;
};

template<typename T>
class SpinLockQueue
{
public:
void enqueue(T& value) { mq.push(value); }

T dequeue() { return mq.pop(); }

long size()
{
throw std::runtime_error("Size for fast queue unimplemented!");
}

void drain()
{
while (mq.pop()) {
;
}
}

void reset() { ; }

private:
atomic_queue::AtomicQueue2<T, 1024, true, true, false, true> mq;
};

class TokenPool
{
public:
Expand Down
12 changes: 12 additions & 0 deletions src/mpi/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <faabric/util/batch.h>
#include <faabric/util/environment.h>
#include <faabric/util/gids.h>
#include <faabric/util/hwloc.h>
#include <faabric/util/macros.h>
#include <faabric/util/memory.h>
#include <faabric/util/testing.h>
Expand Down Expand Up @@ -52,6 +53,9 @@ struct MpiRankState
// have not `wait`ed on yet.
std::vector<std::unique_ptr<std::list<MpiMessage>>> unackedMessageBuffers;

// CPU that this thread is pinned to
std::unique_ptr<faabric::util::FaabricCpuSet> pinnedCpu;

// ----- Remote Messaging -----

// This structure contains one send socket per remote rank
Expand Down Expand Up @@ -89,6 +93,9 @@ struct MpiRankState
recvSocket.reset();
recvConnPool.clear();

// Free the pinned-to CPU
pinnedCpu.reset();

// Local message count
msgCount = 1;

Expand Down Expand Up @@ -254,6 +261,11 @@ void MpiWorld::initialiseRankFromMsg(faabric::Message& msg)
{
rankState.msg = &msg;

// Pin this thread to a free CPU
if (rankState.pinnedCpu == nullptr) {
rankState.pinnedCpu = faabric::util::pinThreadToFreeCpu(pthread_self());
}

// Initialise the TCP sockets for remote messaging
initSendRecvSockets();
}
Expand Down
Loading

0 comments on commit 7ad4144

Please sign in to comment.