Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: backport bitcoin#24356, extend -socketevents to Sock, implement Sock::WaitMany{Epoll, KQueue} #6018

Draft
wants to merge 8 commits into
base: develop
Choose a base branch
from
Prev Previous commit
Next Next commit
net: implement WaitMany variants for {epoll, kqueue}
Co-authored-by: UdjinM6 <UdjinM6@users.noreply.github.com>
kwvg and UdjinM6 committed Sep 26, 2024
commit 6ffc83292dbf153f6020e93748ebd8730ce521b4
2 changes: 1 addition & 1 deletion src/net.cpp
Original file line number Diff line number Diff line change
@@ -2388,7 +2388,7 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync)
// select(2)). If none are ready, wait for a short while and return
// empty sets.
events_per_sock = GenerateWaitSockets(snap.Nodes());
if ((is_lt && events_per_sock.empty()) || !Sock::IWaitMany(socketEventsMode, timeout, events_per_sock)) {
if ((is_lt && events_per_sock.empty()) || !Sock::IWaitMany(socketEventsMode, GetModeFileDescriptor(), timeout, events_per_sock)) {
if (is_lt) {
interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS));
}
112 changes: 97 additions & 15 deletions src/util/sock.cpp
Original file line number Diff line number Diff line change
@@ -19,6 +19,14 @@
#include <locale>
#endif

#ifdef USE_EPOLL
#include <sys/epoll.h>
#endif

#ifdef USE_KQUEUE
#include <sys/event.h>
#endif

#ifdef USE_POLL
#include <poll.h>
#endif
@@ -155,7 +163,9 @@ bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occur
{
EventsPerSock events_per_sock{std::make_pair(m_socket, Events{requested})};

if (!WaitMany(timeout, events_per_sock)) {
/* We need to specify that we only want level-triggered modes used because we are expecting
a direct correlation between the events reported and the one socket we are querying */
if (!IWaitMany(m_event_mode, m_fd_mode, timeout, events_per_sock, /* lt_only = */ true)) {
return false;
}

@@ -168,13 +178,12 @@ bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occur

bool Sock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
{
return IWaitMany(m_event_mode, timeout, events_per_sock);
return IWaitMany(m_event_mode, m_fd_mode, timeout, events_per_sock);
}

bool Sock::IWaitMany(SocketEventsMode event_mode, std::chrono::milliseconds timeout, EventsPerSock& events_per_sock)
bool Sock::IWaitMany(SocketEventsMode event_mode, std::optional<int> fd_mode, std::chrono::milliseconds timeout,
EventsPerSock& events_per_sock, bool lt_only)
{
std::string debug_str;

switch (event_mode)
{
#ifdef USE_POLL
@@ -183,28 +192,101 @@ bool Sock::IWaitMany(SocketEventsMode event_mode, std::chrono::milliseconds time
#endif /* USE_POLL */
case SocketEventsMode::Select:
return WaitManySelect(timeout, events_per_sock);
#ifdef USE_EPOLL
case SocketEventsMode::EPoll:
debug_str += "Unimplemented for epoll, falling back on ";
break;
/* If we explictly need to use a level triggered mode, skip below to fallback */
if (lt_only) break;
assert(fd_mode);
return WaitManyEPoll(fd_mode.value(), timeout, events_per_sock);
#endif /* USE_EPOLL */
#ifdef USE_KQUEUE
case SocketEventsMode::KQueue:
debug_str += "Unimplemented for kqueue, falling back on ";
break;
/* If we explictly need to use a level triggered mode, skip below to fallback */
if (lt_only) break;
assert(fd_mode);
return WaitManyKQueue(fd_mode.value(), timeout, events_per_sock);
#endif /* USE_KQUEUE */
default:
assert(false);
}
#ifdef USE_POLL
debug_str += "poll";
#else
debug_str += "select";
#endif /* USE_POLL*/
LogPrintf("%s\n", debug_str);
/* We should only be here if we need a level-triggered mode but the instance is edge-triggered instead */
assert(lt_only);
#ifdef USE_POLL
return WaitManyPoll(timeout, events_per_sock);
#else
return WaitManySelect(timeout, events_per_sock);
#endif /* USE_POLL */
}

#ifdef USE_EPOLL
bool Sock::WaitManyEPoll(int fd_mode, std::chrono::milliseconds timeout, EventsPerSock& events_per_sock)
{
std::array<epoll_event, MAX_EVENTS> events;

int ret = epoll_wait(fd_mode, events.data(), events.size(), count_milliseconds(timeout));
if (ret == SOCKET_ERROR) {
return false;
}

/* Events reported do not correspond to sockets requested in edge-triggered modes, we will clear the
entire map before populating it with our events data. */
events_per_sock.clear();

for (int idx = 0; idx < ret; idx++) {
auto& ev = events[idx];
Event occurred = 0;
if (ev.events & (EPOLLERR | EPOLLHUP)) {
occurred |= ERR;
} else {
if (ev.events & EPOLLIN) {
occurred |= RECV;
}
if (ev.events & EPOLLOUT) {
occurred |= SEND;
}
}
events_per_sock.emplace(static_cast<SOCKET>(ev.data.fd), Sock::Events{occurred, occurred});
}

return true;
}
#endif /* USE_EPOLL */

#ifdef USE_KQUEUE
bool Sock::WaitManyKQueue(int fd_mode, std::chrono::milliseconds timeout, EventsPerSock& events_per_sock)
{
std::array<struct kevent, MAX_EVENTS> events;
struct timespec ts = MillisToTimespec(timeout);

int ret = kevent(fd_mode, nullptr, 0, events.data(), events.size(), &ts);
if (ret == SOCKET_ERROR) {
return false;
}

/* Events reported do not correspond to sockets requested in edge-triggered modes, we will clear the
entire map before populating it with our events data. */
events_per_sock.clear();

for (int idx = 0; idx < ret; idx++) {
auto& ev = events[idx];
Event occurred = 0;
if (ev.flags & (EV_ERROR | EV_EOF)) {
occurred |= ERR;
} else {
if (ev.filter == EVFILT_READ) {
occurred |= RECV;
}
if (ev.filter == EVFILT_WRITE) {
occurred |= SEND;
}
}
events_per_sock.emplace(static_cast<SOCKET>(ev.ident), Sock::Events{occurred, occurred});
}

return true;
}
#endif /* USE_KQUEUE */

#ifdef USE_POLL
bool Sock::WaitManyPoll(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock)
{
11 changes: 9 additions & 2 deletions src/util/sock.h
Original file line number Diff line number Diff line change
@@ -230,7 +230,7 @@ class Sock
* Auxiliary requested/occurred events to wait for in `WaitMany()`.
*/
struct Events {
explicit Events(Event req) : requested{req}, occurred{0} {}
explicit Events(Event req, Event ocr = 0) : requested{req}, occurred{ocr} {}
Event requested;
Event occurred;
};
@@ -273,7 +273,14 @@ class Sock
* This doesn't apply to Sock::Wait(), as it populates an EventsPerSock map with its own raw
* socket before passing it to WaitMany.
*/
static bool IWaitMany(SocketEventsMode event_mode, std::chrono::milliseconds timeout, EventsPerSock& events_per_sock);
static bool IWaitMany(SocketEventsMode event_mode, std::optional<int> fd_mode, std::chrono::milliseconds timeout,
EventsPerSock& events_per_sock, bool lt_only = false);
#ifdef USE_EPOLL
static bool WaitManyEPoll(int fd_mode, std::chrono::milliseconds timeout, EventsPerSock& events_per_sock);
#endif /* USE_EPOLL */
#ifdef USE_KQUEUE
static bool WaitManyKQueue(int fd_mode, std::chrono::milliseconds timeout, EventsPerSock& events_per_sock);
#endif /* USE_KQUEUE */
#ifdef USE_POLL
static bool WaitManyPoll(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock);
#endif /* USE_POLL */