Skip to content

Commit

Permalink
net: implement ToggleWakeupPipe in all WaitMany variants
Browse files Browse the repository at this point in the history
`ToggleWakeupPipe` code was removed in bitcoin#24356 as part of migration
from `CConnman::SocketEvents` to `Sock::WaitMany` and because `Sock` didn't
have the necessary plumbing.
  • Loading branch information
kwvg committed May 29, 2024
1 parent ad07a06 commit 7185762
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 29 deletions.
9 changes: 8 additions & 1 deletion src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,9 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
return nullptr;
}

// Set toggle wakeup pipe function to be called in between WakeMany()'s API call
sock->SetWrapFn(ToggleWakeupPipe);

// Add node
NodeId id = GetNewNodeId();
uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize();
Expand Down Expand Up @@ -1797,7 +1800,8 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync)
// select(2)). If none are ready, wait for a short while and return
// empty sets.
events_per_sock = GenerateWaitSockets(snap.Nodes());
if (events_per_sock.empty() || !Sock::IWaitMany(socketEventsMode, GetModeFileDescriptor(), timeout, events_per_sock)) {
if (events_per_sock.empty() || !Sock::IWaitMany(socketEventsMode, GetModeFileDescriptor(),
ToggleWakeupPipe, timeout, events_per_sock)) {
if ((socketEventsMode == SocketEventsMode::Poll && !only_poll) ||
socketEventsMode == SocketEventsMode::Select)
{
Expand Down Expand Up @@ -3078,6 +3082,9 @@ bool CConnman::BindListenPort(const CService& addrBind, bilingual_str& strError,
return false;
}

// Set toggle wakeup pipe function to be called in between WakeMany()'s API call
sock->SetWrapFn(ToggleWakeupPipe);

if (m_edge_trig_events && !m_edge_trig_events->AddSocket(sock->Get())) {
LogPrintf("Error: EdgeTriggeredEvents::AddSocket() failed\n");
return false;
Expand Down
6 changes: 2 additions & 4 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -1508,15 +1508,13 @@ friend class CNode;
return std::nullopt;
}

template <typename Callable>
void ToggleWakeupPipe(Callable&& func)
{
wrap_fn ToggleWakeupPipe = [&](std::function<void()>&& func) {
if (m_wakeup_pipe) {
m_wakeup_pipe->Toggle(func);
} else {
func();
}
}
};

Mutex cs_sendable_receivable_nodes;
std::unordered_map<NodeId, CNode*> mapReceivableNodes GUARDED_BY(cs_sendable_receivable_nodes);
Expand Down
57 changes: 39 additions & 18 deletions src/util/sock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occur

/* We need to specify that we only want level-triggered modes used because we are expecting
a direct correlation between the events reported and the one socket we are querying */
if (!IWaitMany(m_event_mode, m_fd_mode, timeout, events_per_sock, /* lt_only = */ true)) {
if (!IWaitMany(m_event_mode, m_fd_mode, m_wrap_func, timeout, events_per_sock, /* lt_only = */ true)) {
return false;
}

Expand All @@ -163,52 +163,56 @@ bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occur

bool Sock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
{
return IWaitMany(m_event_mode, m_fd_mode, timeout, events_per_sock);
return IWaitMany(m_event_mode, m_fd_mode, m_wrap_func, timeout, events_per_sock);
}

bool Sock::IWaitMany(SocketEventsMode event_mode, std::optional<int> fd_mode, std::chrono::milliseconds timeout,
EventsPerSock& events_per_sock, bool lt_only)
bool Sock::IWaitMany(SocketEventsMode event_mode, std::optional<int> fd_mode, wrap_fn wrap_func,
std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, bool lt_only)
{
switch (event_mode)
{
#ifdef USE_POLL
case SocketEventsMode::Poll:
return WaitManyPoll(timeout, events_per_sock);
return WaitManyPoll(wrap_func, timeout, events_per_sock);
#endif /* USE_POLL */
case SocketEventsMode::Select:
return WaitManySelect(timeout, events_per_sock);
return WaitManySelect(wrap_func, timeout, events_per_sock);
#ifdef USE_EPOLL
case SocketEventsMode::EPoll:
/* If we explictly need to use a level triggered mode, skip below to fallback */
if (lt_only) break;
assert(fd_mode);
return WaitManyEPoll(fd_mode.value(), timeout, events_per_sock);
return WaitManyEPoll(fd_mode.value(), wrap_func, timeout, events_per_sock);
#endif /* USE_EPOLL */
#ifdef USE_KQUEUE
case SocketEventsMode::KQueue:
/* If we explictly need to use a level triggered mode, skip below to fallback */
if (lt_only) break;
assert(fd_mode);
return WaitManyKQueue(fd_mode.value(), timeout, events_per_sock);
return WaitManyKQueue(fd_mode.value(), wrap_func, timeout, events_per_sock);
#endif /* USE_KQUEUE */
default:
assert(false);
}
/* We should only be here if we need a level-triggered mode but the instance is edge-triggered instead */
assert(lt_only);
#ifdef USE_POLL
return WaitManyPoll(timeout, events_per_sock);
return WaitManyPoll(wrap_func, timeout, events_per_sock);
#else
return WaitManySelect(timeout, events_per_sock);
return WaitManySelect(wrap_func, timeout, events_per_sock);
#endif /* USE_POLL */
}

#ifdef USE_EPOLL
bool Sock::WaitManyEPoll(int fd_mode, std::chrono::milliseconds timeout, EventsPerSock& events_per_sock)
bool Sock::WaitManyEPoll(int fd_mode, wrap_fn wrap_func,
std::chrono::milliseconds timeout, EventsPerSock& events_per_sock)
{
std::array<epoll_event, MAX_EVENTS> events;

int ret = epoll_wait(fd_mode, events.data(), events.size(), count_milliseconds(timeout));
int ret{SOCKET_ERROR};
wrap_func([&](){
ret = epoll_wait(fd_mode, events.data(), events.size(), count_milliseconds(timeout));
});
if (ret == SOCKET_ERROR) {
return false;
}
Expand Down Expand Up @@ -238,12 +242,16 @@ bool Sock::WaitManyEPoll(int fd_mode, std::chrono::milliseconds timeout, EventsP
#endif /* USE_EPOLL */

#ifdef USE_KQUEUE
bool Sock::WaitManyKQueue(int fd_mode, std::chrono::milliseconds timeout, EventsPerSock& events_per_sock)
bool Sock::WaitManyKQueue(int fd_mode, wrap_fn wrap_func,
std::chrono::milliseconds timeout, EventsPerSock& events_per_sock)
{
std::array<struct kevent, MAX_EVENTS> events;
struct timespec ts = MillisToTimespec(timeout);

int ret = kevent(fd_mode, nullptr, 0, events.data(), events.size(), &ts);
int ret{SOCKET_ERROR};
wrap_func([&](){
ret = kevent(fd_mode, nullptr, 0, events.data(), events.size(), &ts);
});
if (ret == SOCKET_ERROR) {
return false;
}
Expand Down Expand Up @@ -273,7 +281,7 @@ bool Sock::WaitManyKQueue(int fd_mode, std::chrono::milliseconds timeout, Events
#endif /* USE_KQUEUE */

#ifdef USE_POLL
bool Sock::WaitManyPoll(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock)
bool Sock::WaitManyPoll(wrap_fn wrap_func, std::chrono::milliseconds timeout, EventsPerSock& events_per_sock)
{
std::vector<pollfd> pfds;
for (const auto& [socket, events] : events_per_sock) {
Expand All @@ -288,7 +296,11 @@ bool Sock::WaitManyPoll(std::chrono::milliseconds timeout, EventsPerSock& events
}
}

if (poll(pfds.data(), pfds.size(), count_milliseconds(timeout)) == SOCKET_ERROR) {
int ret{SOCKET_ERROR};
wrap_func([&](){
ret = poll(pfds.data(), pfds.size(), count_milliseconds(timeout));
});
if (ret == SOCKET_ERROR) {
return false;
}

Expand All @@ -313,7 +325,7 @@ bool Sock::WaitManyPoll(std::chrono::milliseconds timeout, EventsPerSock& events
}
#endif /* USE_POLL */

bool Sock::WaitManySelect(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock)
bool Sock::WaitManySelect(wrap_fn wrap_func, std::chrono::milliseconds timeout, EventsPerSock& events_per_sock)
{
fd_set recv;
fd_set send;
Expand All @@ -340,7 +352,11 @@ bool Sock::WaitManySelect(std::chrono::milliseconds timeout, EventsPerSock& even

timeval tv = MillisToTimeval(timeout);

if (select(socket_max + 1, &recv, &send, &err, &tv) == SOCKET_ERROR) {
int ret{SOCKET_ERROR};
wrap_func([&](){
ret = select(socket_max + 1, &recv, &send, &err, &tv);
});
if (ret == SOCKET_ERROR) {
return false;
}

Expand All @@ -361,6 +377,11 @@ bool Sock::WaitManySelect(std::chrono::milliseconds timeout, EventsPerSock& even
return true;
}

void Sock::SetWrapFn(wrap_fn wrap_func)
{
m_wrap_func = wrap_func;
}

void Sock::SendComplete(const std::string& data,
std::chrono::milliseconds timeout,
CThreadInterrupt& interrupt) const
Expand Down
24 changes: 18 additions & 6 deletions src/util/sock.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <util/time.h>

#include <chrono>
#include <functional>
#include <memory>
#include <optional>
#include <string>
Expand Down Expand Up @@ -70,6 +71,8 @@ static SocketEventsMode SEMFromString(const std::string str)
else { return SocketEventsMode::Unknown; }
}

using wrap_fn = std::function<void(std::function<void()>&&)>;

/**
* RAII helper class that manages a socket. Mimics `std::unique_ptr`, but instead of a pointer it
* contains a socket and closes it automatically when it goes out of scope.
Expand Down Expand Up @@ -254,18 +257,25 @@ class Sock
* This doesn't apply to Sock::Wait(), as it populates an EventsPerSock map with its own raw
* socket before passing it to WaitMany.
*/
static bool IWaitMany(SocketEventsMode event_mode, std::optional<int> fd_mode, std::chrono::milliseconds timeout,
EventsPerSock& events_per_sock, bool lt_only = false);
static bool IWaitMany(SocketEventsMode event_mode, std::optional<int> fd_mode, wrap_fn wrap_func,
std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, bool lt_only = false);
#ifdef USE_EPOLL
static bool WaitManyEPoll(int fd_mode, std::chrono::milliseconds timeout, EventsPerSock& events_per_sock);
static bool WaitManyEPoll(int fd_mode, wrap_fn wrap_func,
std::chrono::milliseconds timeout, EventsPerSock& events_per_sock);
#endif /* USE_EPOLL */
#ifdef USE_KQUEUE
static bool WaitManyKQueue(int fd_mode, std::chrono::milliseconds timeout, EventsPerSock& events_per_sock);
static bool WaitManyKQueue(int fd_mode, wrap_fn wrap_func,
std::chrono::milliseconds timeout, EventsPerSock& events_per_sock);
#endif /* USE_KQUEUE */
#ifdef USE_POLL
static bool WaitManyPoll(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock);
static bool WaitManyPoll(wrap_fn wrap_func,
std::chrono::milliseconds timeout, EventsPerSock& events_per_sock);
#endif /* USE_POLL */
static bool WaitManySelect(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock);
static bool WaitManySelect(wrap_fn wrap_func,
std::chrono::milliseconds timeout, EventsPerSock& events_per_sock);

/* Set function wrapped around WaitMany()'s API call */
void SetWrapFn(wrap_fn wrap_func);

/* Higher level, convenience, methods. These may throw. */

Expand Down Expand Up @@ -314,6 +324,8 @@ class Sock
SocketEventsMode m_event_mode{SocketEventsMode::Unknown};
/* Optional containing file descriptor for applicable event modes */
std::optional<int> m_fd_mode{std::nullopt};
/* Function that wraps itself around WakeMany()'s API call */
wrap_fn m_wrap_func{[](std::function<void()>&& func){func();}};
};

/** Return readable error string for a network error code */
Expand Down

0 comments on commit 7185762

Please sign in to comment.