Skip to content

Commit

Permalink
transport(tcp): make send sockets blocking (#424)
Browse files Browse the repository at this point in the history
Under high-load we are seeing some ::send operations fail with
EAGAIN/EWOULDBLOCK. This is because we set send sockets as non-blocking,
but we do not handle the EAGAIN/EWOULDBLOCK situation in the send loop.
As a first iteration, we attempt setting the send sockets as blocking.

Note that, irrespective of wether send sockets block or not, partial
writes may still happen.
  • Loading branch information
csegarragonz authored Apr 16, 2024
1 parent bb8c00e commit a56f111
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 8 deletions.
2 changes: 2 additions & 0 deletions src/transport/tcp/RecvSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
#include <faabric/transport/tcp/SocketOptions.h>
#include <faabric/util/logging.h>

#ifdef FAABRIC_USE_SPINLOCK
#include <emmintrin.h>
#endif
#include <poll.h>

namespace faabric::transport::tcp {
Expand Down
31 changes: 23 additions & 8 deletions src/transport/tcp/SendSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
#include <faabric/util/macros.h>

#include <arpa/inet.h>
#ifdef FAABRIC_USE_SPINLOCK
#include <emmintrin.h>
#endif
#include <sys/socket.h>
#include <sys/uio.h>
#include <unistd.h>
Expand All @@ -20,7 +23,14 @@ void SendSocket::setSocketOptions(int connFd)
{
setNoDelay(connFd);
setQuickAck(connFd);
#ifdef FAABRIC_USE_SPINLOCK
setNonBlocking(connFd);
#else
// If we decide to make send sockets non-blocking, then in the sendOne loop
// we need to treat the special case where we return EAGAIN or EWOULDBLOCK
setBlocking(connFd);
setSendTimeoutMs(connFd, SocketTimeoutMs);
#endif
}

void SendSocket::dial()
Expand Down Expand Up @@ -73,14 +83,19 @@ void SendSocket::sendOne(const uint8_t* buffer, size_t bufferSize)
while (totalNumSent < bufferSize) {
size_t nSent = ::send(sock.get(), buffer, bufferSize - totalNumSent, 0);
if (nSent == -1) {
SPDLOG_ERROR(
"TCP client error sending TCP message to {}:{} ({}/{}): {}",
host,
port,
nSent,
bufferSize,
std::strerror(errno));
throw std::runtime_error("TCP client error sending message!");
#ifdef FAABRIC_USE_SPINLOCK
if (errno == EAGAIN || errno == EWOULDBLOCK) {
_mm_pause();
continue;
};
#endif
SPDLOG_ERROR("Error error sending TCP message to {}:{} ({}/{}): {}",
host,
port,
totalNumSent,
bufferSize,
std::strerror(errno));
throw std::runtime_error("Error sending TCP message!");
}

buffer += nSent;
Expand Down

0 comments on commit a56f111

Please sign in to comment.