From 3b11ef9b89eff6a335a1f2e21fc15502193c0d20 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Tue, 14 May 2024 17:23:14 +0000 Subject: [PATCH 1/9] refactor: move `CConnman::SocketEventsMode` to `util/sock.h` --- src/init.cpp | 20 +++----------------- src/net.cpp | 36 ++++++++++++++++++------------------ src/net.h | 9 +-------- src/rpc/net.cpp | 21 +++------------------ src/util/sock.h | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 73 insertions(+), 61 deletions(-) diff --git a/src/init.cpp b/src/init.cpp index d52ab19225c17..c88a6c0af2666 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -2515,23 +2515,9 @@ bool AppInitMain(const CoreContext& context, NodeContext& node, interfaces::Bloc } } - std::string strSocketEventsMode = args.GetArg("-socketevents", DEFAULT_SOCKETEVENTS); - if (strSocketEventsMode == "select") { - connOptions.socketEventsMode = CConnman::SOCKETEVENTS_SELECT; -#ifdef USE_POLL - } else if (strSocketEventsMode == "poll") { - connOptions.socketEventsMode = CConnman::SOCKETEVENTS_POLL; -#endif -#ifdef USE_EPOLL - } else if (strSocketEventsMode == "epoll") { - connOptions.socketEventsMode = CConnman::SOCKETEVENTS_EPOLL; -#endif -#ifdef USE_KQUEUE - } else if (strSocketEventsMode == "kqueue") { - connOptions.socketEventsMode = CConnman::SOCKETEVENTS_KQUEUE; -#endif - } else { - return InitError(strprintf(_("Invalid -socketevents ('%s') specified. Only these modes are supported: %s"), strSocketEventsMode, GetSupportedSocketEventsStr())); + std::string sem_str = args.GetArg("-socketevents", DEFAULT_SOCKETEVENTS); + if (SEMFromString(sem_str) == SocketEventsMode::Unknown) { + return InitError(strprintf(_("Invalid -socketevents ('%s') specified. Only these modes are supported: %s"), sem_str, GetSupportedSocketEventsStr())); } const std::string& i2psam_arg = args.GetArg("-i2psam", ""); diff --git a/src/net.cpp b/src/net.cpp index 90957d5f5327c..28f9477f9af50 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1782,21 +1782,21 @@ void CConnman::SocketEvents(const std::vector& nodes, { switch (socketEventsMode) { #ifdef USE_KQUEUE - case SOCKETEVENTS_KQUEUE: + case SocketEventsMode::KQueue: SocketEventsKqueue(recv_set, send_set, error_set, only_poll); break; #endif #ifdef USE_EPOLL - case SOCKETEVENTS_EPOLL: + case SocketEventsMode::EPoll: SocketEventsEpoll(recv_set, send_set, error_set, only_poll); break; #endif #ifdef USE_POLL - case SOCKETEVENTS_POLL: + case SocketEventsMode::Poll: SocketEventsPoll(nodes, recv_set, send_set, error_set, only_poll); break; #endif - case SOCKETEVENTS_SELECT: + case SocketEventsMode::Select: SocketEventsSelect(nodes, recv_set, send_set, error_set, only_poll); break; default: @@ -3133,7 +3133,7 @@ bool CConnman::BindListenPort(const CService& addrBind, bilingual_str& strError, } #ifdef USE_KQUEUE - if (socketEventsMode == SOCKETEVENTS_KQUEUE) { + if (socketEventsMode == SocketEventsMode::KQueue) { struct kevent event; EV_SET(&event, sock->Get(), EVFILT_READ, EV_ADD, 0, 0, nullptr); if (kevent(kqueuefd, &event, 1, nullptr, 0, nullptr) != 0) { @@ -3145,7 +3145,7 @@ bool CConnman::BindListenPort(const CService& addrBind, bilingual_str& strError, #endif #ifdef USE_EPOLL - if (socketEventsMode == SOCKETEVENTS_EPOLL) { + if (socketEventsMode == SocketEventsMode::EPoll) { epoll_event event; event.data.fd = sock->Get(); event.events = EPOLLIN; @@ -3302,7 +3302,7 @@ bool CConnman::Start(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_met Init(connOptions); #ifdef USE_KQUEUE - if (socketEventsMode == SOCKETEVENTS_KQUEUE) { + if (socketEventsMode == SocketEventsMode::KQueue) { kqueuefd = kqueue(); if (kqueuefd == -1) { LogPrintf("kqueue failed\n"); @@ -3312,7 +3312,7 @@ bool CConnman::Start(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_met #endif #ifdef USE_EPOLL - if (socketEventsMode == SOCKETEVENTS_EPOLL) { + if (socketEventsMode == SocketEventsMode::EPoll) { epollfd = epoll_create1(0); if (epollfd == -1) { LogPrintf("epoll_create1 failed\n"); @@ -3405,7 +3405,7 @@ bool CConnman::Start(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_met LogPrint(BCLog::NET, "fcntl for O_NONBLOCK on wakeupPipe failed\n"); } #ifdef USE_KQUEUE - if (socketEventsMode == SOCKETEVENTS_KQUEUE) { + if (socketEventsMode == SocketEventsMode::KQueue) { struct kevent event; EV_SET(&event, wakeupPipe[0], EVFILT_READ, EV_ADD, 0, 0, nullptr); int r = kevent(kqueuefd, &event, 1, nullptr, 0, nullptr); @@ -3417,7 +3417,7 @@ bool CConnman::Start(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_met } #endif #ifdef USE_EPOLL - if (socketEventsMode == SOCKETEVENTS_EPOLL) { + if (socketEventsMode == SocketEventsMode::EPoll) { epoll_event event; event.events = EPOLLIN; event.data.fd = wakeupPipe[0]; @@ -3567,14 +3567,14 @@ void CConnman::StopNodes() for (ListenSocket& hListenSocket : vhListenSocket) if (hListenSocket.socket != INVALID_SOCKET) { #ifdef USE_KQUEUE - if (socketEventsMode == SOCKETEVENTS_KQUEUE) { + if (socketEventsMode == SocketEventsMode::KQueue) { struct kevent event; EV_SET(&event, hListenSocket.socket, EVFILT_READ, EV_DELETE, 0, 0, nullptr); kevent(kqueuefd, &event, 1, nullptr, 0, nullptr); } #endif #ifdef USE_EPOLL - if (socketEventsMode == SOCKETEVENTS_EPOLL) { + if (socketEventsMode == SocketEventsMode::EPoll) { epoll_ctl(epollfd, EPOLL_CTL_DEL, hListenSocket.socket, nullptr); } #endif @@ -3606,7 +3606,7 @@ void CConnman::StopNodes() semAddnode.reset(); #ifdef USE_KQUEUE - if (socketEventsMode == SOCKETEVENTS_KQUEUE && kqueuefd != -1) { + if (socketEventsMode == SocketEventsMode::KQueue && kqueuefd != -1) { #ifdef USE_WAKEUP_PIPE struct kevent event; EV_SET(&event, wakeupPipe[0], EVFILT_READ, EV_DELETE, 0, 0, nullptr); @@ -3617,7 +3617,7 @@ void CConnman::StopNodes() kqueuefd = -1; #endif #ifdef USE_EPOLL - if (socketEventsMode == SOCKETEVENTS_EPOLL && epollfd != -1) { + if (socketEventsMode == SocketEventsMode::EPoll && epollfd != -1) { #ifdef USE_WAKEUP_PIPE epoll_ctl(epollfd, EPOLL_CTL_DEL, wakeupPipe[0], nullptr); #endif @@ -4237,7 +4237,7 @@ uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& ad) const void CConnman::RegisterEvents(CNode *pnode) { #ifdef USE_KQUEUE - if (socketEventsMode == SOCKETEVENTS_KQUEUE) { + if (socketEventsMode == SocketEventsMode::KQueue) { LOCK(pnode->cs_hSocket); assert(pnode->hSocket != INVALID_SOCKET); @@ -4253,7 +4253,7 @@ void CConnman::RegisterEvents(CNode *pnode) } #endif #ifdef USE_EPOLL - if (socketEventsMode == SOCKETEVENTS_EPOLL) { + if (socketEventsMode == SocketEventsMode::EPoll) { LOCK(pnode->cs_hSocket); assert(pnode->hSocket != INVALID_SOCKET); @@ -4274,7 +4274,7 @@ void CConnman::RegisterEvents(CNode *pnode) void CConnman::UnregisterEvents(CNode *pnode) { #ifdef USE_KQUEUE - if (socketEventsMode == SOCKETEVENTS_KQUEUE) { + if (socketEventsMode == SocketEventsMode::KQueue) { AssertLockHeld(pnode->cs_hSocket); if (pnode->hSocket == INVALID_SOCKET) { return; @@ -4292,7 +4292,7 @@ void CConnman::UnregisterEvents(CNode *pnode) } #endif #ifdef USE_EPOLL - if (socketEventsMode == SOCKETEVENTS_EPOLL) { + if (socketEventsMode == SocketEventsMode::EPoll) { AssertLockHeld(pnode->cs_hSocket); if (pnode->hSocket == INVALID_SOCKET) { return; diff --git a/src/net.h b/src/net.h index a8b6505099564..9cb80b484d1fb 100644 --- a/src/net.h +++ b/src/net.h @@ -822,13 +822,6 @@ class CConnman { friend class CNode; public: - enum SocketEventsMode { - SOCKETEVENTS_SELECT = 0, - SOCKETEVENTS_POLL = 1, - SOCKETEVENTS_EPOLL = 2, - SOCKETEVENTS_KQUEUE = 3, - }; - struct Options { ServiceFlags nLocalServices = NODE_NONE; @@ -852,7 +845,7 @@ friend class CNode; bool m_use_addrman_outgoing = true; std::vector m_specified_outgoing; std::vector m_added_nodes; - SocketEventsMode socketEventsMode = SOCKETEVENTS_SELECT; + SocketEventsMode socketEventsMode = SocketEventsMode::Select; std::vector m_asmap; bool m_i2p_accept_incoming; }; diff --git a/src/rpc/net.cpp b/src/rpc/net.cpp index a5e42449958ae..5794c7c05b4d1 100644 --- a/src/rpc/net.cpp +++ b/src/rpc/net.cpp @@ -669,24 +669,9 @@ static RPCHelpMan getnetworkinfo() obj.pushKV("connections_mn", (int)node.connman->GetNodeCount(ConnectionDirection::Verified)); obj.pushKV("connections_mn_in", (int)node.connman->GetNodeCount(ConnectionDirection::VerifiedIn)); obj.pushKV("connections_mn_out", (int)node.connman->GetNodeCount(ConnectionDirection::VerifiedOut)); - std::string strSocketEvents; - switch (node.connman->GetSocketEventsMode()) { - case CConnman::SOCKETEVENTS_SELECT: - strSocketEvents = "select"; - break; - case CConnman::SOCKETEVENTS_POLL: - strSocketEvents = "poll"; - break; - case CConnman::SOCKETEVENTS_EPOLL: - strSocketEvents = "epoll"; - break; - case CConnman::SOCKETEVENTS_KQUEUE: - strSocketEvents = "kqueue"; - break; - default: - CHECK_NONFATAL(false); - } - obj.pushKV("socketevents", strSocketEvents); + std::string sem_str = SEMToString(node.connman->GetSocketEventsMode()); + CHECK_NONFATAL(sem_str != "unknown"); + obj.pushKV("socketevents", sem_str); } obj.pushKV("networks", GetNetworksInfo()); obj.pushKV("relayfee", ValueFromAmount(::minRelayTxFee.GetFeePerK())); diff --git a/src/util/sock.h b/src/util/sock.h index 8099c702a1c4d..324e0c763ed54 100644 --- a/src/util/sock.h +++ b/src/util/sock.h @@ -18,6 +18,54 @@ */ static constexpr auto MAX_WAIT_FOR_IO = 1s; +enum class SocketEventsMode : int8_t { + Select = 0, + Poll = 1, + EPoll = 2, + KQueue = 3, + + Unknown = -1 +}; + +/* Converts SocketEventsMode value to string with additional check to report modes not compiled for as unknown */ +static std::string SEMToString(const SocketEventsMode val) +{ + switch (val) { + case (SocketEventsMode::Select): + return "select"; +#ifdef USE_POLL + case (SocketEventsMode::Poll): + return "poll"; +#endif /* USE_POLL */ +#ifdef USE_EPOLL + case (SocketEventsMode::EPoll): + return "epoll"; +#endif /* USE_EPOLL */ +#ifdef USE_KQUEUE + case (SocketEventsMode::KQueue): + return "kqueue"; +#endif /* USE_KQUEUE */ + default: + return "unknown"; + }; +} + +/* Converts string to SocketEventsMode value with additional check to report modes not compiled for as unknown */ +static SocketEventsMode SEMFromString(const std::string str) +{ + if (str == "select") { return SocketEventsMode::Select; } +#ifdef USE_POLL + else if (str == "poll") { return SocketEventsMode::Poll; } +#endif /* USE_POLL */ +#ifdef USE_EPOLL + else if (str == "epoll") { return SocketEventsMode::EPoll; } +#endif /* USE_EPOLL */ +#ifdef USE_KQUEUE + else if (str == "kqueue") { return SocketEventsMode::KQueue; } +#endif /* USE_KQUEUE */ + else { return SocketEventsMode::Unknown; } +} + /** * RAII helper class that manages a socket. Mimics `std::unique_ptr`, but instead of a pointer it * contains a socket and closes it automatically when it goes out of scope. From 212df0677f7ef735a82453ad48ccdd24bf98e30c Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Tue, 14 May 2024 17:22:42 +0000 Subject: [PATCH 2/9] refactor: introduce `EdgeTriggeredEvents`, move {epoll, kqueue} fd there --- src/Makefile.am | 2 ++ src/net.cpp | 75 ++++++++++++++++++++--------------------------- src/net.h | 10 ++----- src/util/edge.cpp | 60 +++++++++++++++++++++++++++++++++++++ src/util/edge.h | 35 ++++++++++++++++++++++ 5 files changed, 131 insertions(+), 51 deletions(-) create mode 100644 src/util/edge.cpp create mode 100644 src/util/edge.h diff --git a/src/Makefile.am b/src/Makefile.am index 472d88aa2c9de..dadf9224ffa4c 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -327,6 +327,7 @@ BITCOIN_CORE_H = \ util/bip32.h \ util/bytevectorhash.h \ util/check.h \ + util/edge.h \ util/enumerate.h \ util/epochguard.h \ util/error.h \ @@ -776,6 +777,7 @@ libbitcoin_util_a_SOURCES = \ util/bip32.cpp \ util/bytevectorhash.cpp \ util/check.cpp \ + util/edge.cpp \ util/error.cpp \ util/fees.cpp \ util/hasher.cpp \ diff --git a/src/net.cpp b/src/net.cpp index 28f9477f9af50..34a0aa93d73b0 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1588,7 +1588,7 @@ void CConnman::SocketEventsKqueue(std::set& recv_set, timeout.tv_nsec = (only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS % 1000) * 1000 * 1000; wakeupSelectNeeded = true; - int n = kevent(kqueuefd, nullptr, 0, events, maxEvents, &timeout); + int n = kevent(Assert(m_edge_trig_events)->m_fd, nullptr, 0, events, maxEvents, &timeout); wakeupSelectNeeded = false; if (n == -1) { LogPrintf("kevent wait error\n"); @@ -1622,7 +1622,7 @@ void CConnman::SocketEventsEpoll(std::set& recv_set, epoll_event events[maxEvents]; wakeupSelectNeeded = true; - int n = epoll_wait(epollfd, events, maxEvents, only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS); + int n = epoll_wait(Assert(m_edge_trig_events)->m_fd, events, maxEvents, only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS); wakeupSelectNeeded = false; for (int i = 0; i < n; i++) { auto& e = events[i]; @@ -3136,8 +3136,8 @@ bool CConnman::BindListenPort(const CService& addrBind, bilingual_str& strError, if (socketEventsMode == SocketEventsMode::KQueue) { struct kevent event; EV_SET(&event, sock->Get(), EVFILT_READ, EV_ADD, 0, 0, nullptr); - if (kevent(kqueuefd, &event, 1, nullptr, 0, nullptr) != 0) { - strError = strprintf(_("Error: failed to add socket to kqueuefd (kevent returned error %s)"), NetworkErrorString(WSAGetLastError())); + if (kevent(Assert(m_edge_trig_events)->m_fd, &event, 1, nullptr, 0, nullptr) != 0) { + strError = strprintf(_("Error: failed to add socket to kqueue fd (kevent returned error %s)"), NetworkErrorString(WSAGetLastError())); LogPrintf("%s\n", strError.original); return false; } @@ -3149,8 +3149,8 @@ bool CConnman::BindListenPort(const CService& addrBind, bilingual_str& strError, epoll_event event; event.data.fd = sock->Get(); event.events = EPOLLIN; - if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sock->Get(), &event) != 0) { - strError = strprintf(_("Error: failed to add socket to epollfd (epoll_ctl returned error %s)"), NetworkErrorString(WSAGetLastError())); + if (epoll_ctl(Assert(m_edge_trig_events)->m_fd, EPOLL_CTL_ADD, sock->Get(), &event) != 0) { + strError = strprintf(_("Error: failed to add socket to epoll fd (epoll_ctl returned error %s)"), NetworkErrorString(WSAGetLastError())); LogPrintf("%s\n", strError.original); return false; } @@ -3301,25 +3301,14 @@ bool CConnman::Start(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_met AssertLockNotHeld(m_total_bytes_sent_mutex); Init(connOptions); -#ifdef USE_KQUEUE - if (socketEventsMode == SocketEventsMode::KQueue) { - kqueuefd = kqueue(); - if (kqueuefd == -1) { - LogPrintf("kqueue failed\n"); - return false; - } - } -#endif - -#ifdef USE_EPOLL - if (socketEventsMode == SocketEventsMode::EPoll) { - epollfd = epoll_create1(0); - if (epollfd == -1) { - LogPrintf("epoll_create1 failed\n"); + if (socketEventsMode == SocketEventsMode::EPoll || socketEventsMode == SocketEventsMode::KQueue) { + m_edge_trig_events = std::make_unique(socketEventsMode); + if (!m_edge_trig_events->IsValid()) { + LogPrintf("Unable to initialize EdgeTriggeredEvents instance\n"); + m_edge_trig_events.reset(); return false; } } -#endif if (fListen && !InitBinds(connOptions.vBinds, connOptions.vWhiteBinds, connOptions.onion_binds)) { if (clientInterface) { @@ -3408,10 +3397,10 @@ bool CConnman::Start(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_met if (socketEventsMode == SocketEventsMode::KQueue) { struct kevent event; EV_SET(&event, wakeupPipe[0], EVFILT_READ, EV_ADD, 0, 0, nullptr); - int r = kevent(kqueuefd, &event, 1, nullptr, 0, nullptr); + int r = kevent(Assert(m_edge_trig_events)->m_fd, &event, 1, nullptr, 0, nullptr); if (r != 0) { LogPrint(BCLog::NET, "%s -- kevent(%d, %d, %d, ...) failed. error: %s\n", __func__, - kqueuefd, EV_ADD, wakeupPipe[0], NetworkErrorString(WSAGetLastError())); + m_edge_trig_events->m_fd, EV_ADD, wakeupPipe[0], NetworkErrorString(WSAGetLastError())); return false; } } @@ -3421,10 +3410,10 @@ bool CConnman::Start(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_met epoll_event event; event.events = EPOLLIN; event.data.fd = wakeupPipe[0]; - int r = epoll_ctl(epollfd, EPOLL_CTL_ADD, wakeupPipe[0], &event); + int r = epoll_ctl(Assert(m_edge_trig_events)->m_fd, EPOLL_CTL_ADD, wakeupPipe[0], &event); if (r != 0) { LogPrint(BCLog::NET, "%s -- epoll_ctl(%d, %d, %d, ...) failed. error: %s\n", __func__, - epollfd, EPOLL_CTL_ADD, wakeupPipe[0], NetworkErrorString(WSAGetLastError())); + m_edge_trig_events->m_fd, EPOLL_CTL_ADD, wakeupPipe[0], NetworkErrorString(WSAGetLastError())); return false; } } @@ -3570,12 +3559,12 @@ void CConnman::StopNodes() if (socketEventsMode == SocketEventsMode::KQueue) { struct kevent event; EV_SET(&event, hListenSocket.socket, EVFILT_READ, EV_DELETE, 0, 0, nullptr); - kevent(kqueuefd, &event, 1, nullptr, 0, nullptr); + kevent(Assert(m_edge_trig_events)->m_fd, &event, 1, nullptr, 0, nullptr); } #endif #ifdef USE_EPOLL if (socketEventsMode == SocketEventsMode::EPoll) { - epoll_ctl(epollfd, EPOLL_CTL_DEL, hListenSocket.socket, nullptr); + epoll_ctl(Assert(m_edge_trig_events)->m_fd, EPOLL_CTL_DEL, hListenSocket.socket, nullptr); } #endif if (!CloseSocket(hListenSocket.socket)) @@ -3606,24 +3595,22 @@ void CConnman::StopNodes() semAddnode.reset(); #ifdef USE_KQUEUE - if (socketEventsMode == SocketEventsMode::KQueue && kqueuefd != -1) { + if (socketEventsMode == SocketEventsMode::KQueue && Assert(m_edge_trig_events)->m_fd != -1) { #ifdef USE_WAKEUP_PIPE struct kevent event; EV_SET(&event, wakeupPipe[0], EVFILT_READ, EV_DELETE, 0, 0, nullptr); - kevent(kqueuefd, &event, 1, nullptr, 0, nullptr); + kevent(m_edge_trig_events->m_fd, &event, 1, nullptr, 0, nullptr); #endif - close(kqueuefd); + m_edge_trig_events.reset(); } - kqueuefd = -1; #endif #ifdef USE_EPOLL - if (socketEventsMode == SocketEventsMode::EPoll && epollfd != -1) { + if (socketEventsMode == SocketEventsMode::EPoll && Assert(m_edge_trig_events)->m_fd != -1) { #ifdef USE_WAKEUP_PIPE - epoll_ctl(epollfd, EPOLL_CTL_DEL, wakeupPipe[0], nullptr); + epoll_ctl(m_edge_trig_events->m_fd, EPOLL_CTL_DEL, wakeupPipe[0], nullptr); #endif - close(epollfd); + m_edge_trig_events.reset(); } - epollfd = -1; #endif #ifdef USE_WAKEUP_PIPE @@ -4245,10 +4232,10 @@ void CConnman::RegisterEvents(CNode *pnode) EV_SET(&events[0], pnode->hSocket, EVFILT_READ, EV_ADD, 0, 0, nullptr); EV_SET(&events[1], pnode->hSocket, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, nullptr); - int r = kevent(kqueuefd, events, 2, nullptr, 0, nullptr); + int r = kevent(Assert(m_edge_trig_events)->m_fd, events, 2, nullptr, 0, nullptr); if (r != 0) { LogPrint(BCLog::NET, "%s -- kevent(%d, %d, %d, ...) failed. error: %s\n", __func__, - kqueuefd, EV_ADD, pnode->hSocket, NetworkErrorString(WSAGetLastError())); + m_edge_trig_events->m_fd, EV_ADD, pnode->hSocket, NetworkErrorString(WSAGetLastError())); } } #endif @@ -4262,10 +4249,10 @@ void CConnman::RegisterEvents(CNode *pnode) e.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP; e.data.fd = pnode->hSocket; - int r = epoll_ctl(epollfd, EPOLL_CTL_ADD, pnode->hSocket, &e); + int r = epoll_ctl(Assert(m_edge_trig_events)->m_fd, EPOLL_CTL_ADD, pnode->hSocket, &e); if (r != 0) { LogPrint(BCLog::NET, "%s -- epoll_ctl(%d, %d, %d, ...) failed. error: %s\n", __func__, - epollfd, EPOLL_CTL_ADD, pnode->hSocket, NetworkErrorString(WSAGetLastError())); + m_edge_trig_events->m_fd, EPOLL_CTL_ADD, pnode->hSocket, NetworkErrorString(WSAGetLastError())); } } #endif @@ -4284,10 +4271,10 @@ void CConnman::UnregisterEvents(CNode *pnode) EV_SET(&events[0], pnode->hSocket, EVFILT_READ, EV_DELETE, 0, 0, nullptr); EV_SET(&events[1], pnode->hSocket, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr); - int r = kevent(kqueuefd, events, 2, nullptr, 0, nullptr); + int r = kevent(Assert(m_edge_trig_events)->m_fd, events, 2, nullptr, 0, nullptr); if (r != 0) { LogPrint(BCLog::NET, "%s -- kevent(%d, %d, %d, ...) failed. error: %s\n", __func__, - kqueuefd, EV_DELETE, pnode->hSocket, NetworkErrorString(WSAGetLastError())); + m_edge_trig_events->m_fd, EV_DELETE, pnode->hSocket, NetworkErrorString(WSAGetLastError())); } } #endif @@ -4298,10 +4285,10 @@ void CConnman::UnregisterEvents(CNode *pnode) return; } - int r = epoll_ctl(epollfd, EPOLL_CTL_DEL, pnode->hSocket, nullptr); + int r = epoll_ctl(Assert(m_edge_trig_events)->m_fd, EPOLL_CTL_DEL, pnode->hSocket, nullptr); if (r != 0) { LogPrint(BCLog::NET, "%s -- epoll_ctl(%d, %d, %d, ...) failed. error: %s\n", __func__, - epollfd, EPOLL_CTL_DEL, pnode->hSocket, NetworkErrorString(WSAGetLastError())); + m_edge_trig_events->m_fd, EPOLL_CTL_DEL, pnode->hSocket, NetworkErrorString(WSAGetLastError())); } } #endif diff --git a/src/net.h b/src/net.h index 9cb80b484d1fb..fc4bac01ec809 100644 --- a/src/net.h +++ b/src/net.h @@ -28,9 +28,10 @@ #include #include #include +#include +#include #include #include -#include #include #include @@ -1514,12 +1515,7 @@ friend class CNode; std::atomic wakeupSelectNeeded{false}; SocketEventsMode socketEventsMode; -#ifdef USE_KQUEUE - int kqueuefd{-1}; -#endif -#ifdef USE_EPOLL - int epollfd{-1}; -#endif + std::unique_ptr m_edge_trig_events{nullptr}; Mutex cs_sendable_receivable_nodes; std::unordered_map mapReceivableNodes GUARDED_BY(cs_sendable_receivable_nodes); diff --git a/src/util/edge.cpp b/src/util/edge.cpp new file mode 100644 index 0000000000000..a813653607606 --- /dev/null +++ b/src/util/edge.cpp @@ -0,0 +1,60 @@ +// Copyright (c) 2020-2024 The Dash Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include + +#include +#include + +#include + +#ifdef USE_EPOLL +#include +#endif + +#ifdef USE_KQUEUE +#include +#endif + +EdgeTriggeredEvents::EdgeTriggeredEvents(SocketEventsMode events_mode) + : m_mode(events_mode) +{ + if (m_mode == SocketEventsMode::EPoll) { +#ifdef USE_EPOLL + m_fd = epoll_create1(0); + if (m_fd == -1) { + LogPrintf("Unable to initialize EdgeTriggeredEvents, epoll_create1 returned -1\n"); + return; + } +#else + LogPrintf("Attempting to initialize EdgeTriggeredEvents for epoll without support compiled in!\n"); + return; +#endif /* USE_EPOLL */ + } else if (m_mode == SocketEventsMode::KQueue) { +#ifdef USE_KQUEUE + m_fd = kqueue(); + if (m_fd == -1) { + LogPrintf("Unable to initialize EdgeTriggeredEvents, kqueue returned -1\n"); + return; + } +#else + LogPrintf("Attempting to initialize EdgeTriggeredEvents for kqueue without support compiled in!\n"); + return; +#endif /* USE_KQUEUE */ + } else { + assert(false); + } + m_valid = true; +} + +EdgeTriggeredEvents::~EdgeTriggeredEvents() +{ + if (m_valid) { +#if defined(USE_KQUEUE) || defined(USE_EPOLL) + close(m_fd); +#else + assert(false); +#endif /* defined(USE_KQUEUE) || defined(USE_EPOLL) */ + } +} diff --git a/src/util/edge.h b/src/util/edge.h new file mode 100644 index 0000000000000..c4d38c3922db2 --- /dev/null +++ b/src/util/edge.h @@ -0,0 +1,35 @@ +// Copyright (c) 2020-2024 The Dash Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_UTIL_EDGE_H +#define BITCOIN_UTIL_EDGE_H + +#include + +enum class SocketEventsMode : int8_t; + +/** + * A manager for abstracting logic surrounding edge-triggered socket events + * modes like kqueue and epoll. + */ +class EdgeTriggeredEvents +{ +public: + explicit EdgeTriggeredEvents(SocketEventsMode events_mode); + ~EdgeTriggeredEvents(); + + bool IsValid() const { return m_valid; } + +public: + /* File descriptor used to interact with events mode */ + int m_fd{-1}; + +private: + /* Instance validity flag set during construction */ + bool m_valid{false}; + /* Flag for storing selected socket events mode */ + SocketEventsMode m_mode; +}; + +#endif /* BITCOIN_UTIL_EDGE_H */ From 3a9f38613832d6e9de1e1e74887ecf3b6366aa1a Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Fri, 10 May 2024 18:17:30 +0000 Subject: [PATCH 3/9] refactor: move `SOCKET` addition/removal from interest list to ETE Additionally, log errors if removal from interest list fails (which is possible if it was already removed or socket is invalid). --- src/net.cpp | 42 +++++------------------------ src/util/edge.cpp | 67 +++++++++++++++++++++++++++++++++++++++++++++++ src/util/edge.h | 7 +++++ 3 files changed, 81 insertions(+), 35 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 34a0aa93d73b0..fae20387a4968 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -3132,30 +3132,10 @@ bool CConnman::BindListenPort(const CService& addrBind, bilingual_str& strError, return false; } -#ifdef USE_KQUEUE - if (socketEventsMode == SocketEventsMode::KQueue) { - struct kevent event; - EV_SET(&event, sock->Get(), EVFILT_READ, EV_ADD, 0, 0, nullptr); - if (kevent(Assert(m_edge_trig_events)->m_fd, &event, 1, nullptr, 0, nullptr) != 0) { - strError = strprintf(_("Error: failed to add socket to kqueue fd (kevent returned error %s)"), NetworkErrorString(WSAGetLastError())); - LogPrintf("%s\n", strError.original); - return false; - } - } -#endif - -#ifdef USE_EPOLL - if (socketEventsMode == SocketEventsMode::EPoll) { - epoll_event event; - event.data.fd = sock->Get(); - event.events = EPOLLIN; - if (epoll_ctl(Assert(m_edge_trig_events)->m_fd, EPOLL_CTL_ADD, sock->Get(), &event) != 0) { - strError = strprintf(_("Error: failed to add socket to epoll fd (epoll_ctl returned error %s)"), NetworkErrorString(WSAGetLastError())); - LogPrintf("%s\n", strError.original); - return false; - } + if (m_edge_trig_events && !m_edge_trig_events->AddSocket(sock->Get())) { + LogPrintf("Error: EdgeTriggeredEvents::AddSocket() failed\n"); + return false; } -#endif vhListenSocket.push_back(ListenSocket(sock->Release(), permissions)); @@ -3553,23 +3533,15 @@ void CConnman::StopNodes() for (CNode *pnode : m_nodes) pnode->CloseSocketDisconnect(this); } - for (ListenSocket& hListenSocket : vhListenSocket) + for (ListenSocket& hListenSocket : vhListenSocket) { if (hListenSocket.socket != INVALID_SOCKET) { -#ifdef USE_KQUEUE - if (socketEventsMode == SocketEventsMode::KQueue) { - struct kevent event; - EV_SET(&event, hListenSocket.socket, EVFILT_READ, EV_DELETE, 0, 0, nullptr); - kevent(Assert(m_edge_trig_events)->m_fd, &event, 1, nullptr, 0, nullptr); + if (m_edge_trig_events && !m_edge_trig_events->RemoveSocket(hListenSocket.socket)) { + LogPrintf("EdgeTriggeredEvents::RemoveSocket() failed\n"); } -#endif -#ifdef USE_EPOLL - if (socketEventsMode == SocketEventsMode::EPoll) { - epoll_ctl(Assert(m_edge_trig_events)->m_fd, EPOLL_CTL_DEL, hListenSocket.socket, nullptr); - } -#endif if (!CloseSocket(hListenSocket.socket)) LogPrintf("CloseSocket(hListenSocket) failed with error %s\n", NetworkErrorString(WSAGetLastError())); } + } // clean up some globals (to help leak detection) std::vector nodes; diff --git a/src/util/edge.cpp b/src/util/edge.cpp index a813653607606..bcdaa607b4e8f 100644 --- a/src/util/edge.cpp +++ b/src/util/edge.cpp @@ -58,3 +58,70 @@ EdgeTriggeredEvents::~EdgeTriggeredEvents() #endif /* defined(USE_KQUEUE) || defined(USE_EPOLL) */ } } + +bool EdgeTriggeredEvents::AddSocket(SOCKET socket) const +{ + assert(m_valid); + + if (m_mode == SocketEventsMode::EPoll) { +#ifdef USE_EPOLL + epoll_event event; + event.data.fd = socket; + event.events = EPOLLIN; + if (epoll_ctl(m_fd, EPOLL_CTL_ADD, socket, &event) != 0) { + LogPrintf("Failed to add socket to epoll fd (epoll_ctl returned error %s)\n", + NetworkErrorString(WSAGetLastError())); + return false; + } +#else + assert(false); +#endif /* USE_EPOLL */ + } else if (m_mode == SocketEventsMode::KQueue) { +#ifdef USE_KQUEUE + struct kevent event; + EV_SET(&event, socket, EVFILT_READ, EV_ADD, 0, 0, nullptr); + if (kevent(m_fd, &event, 1, nullptr, 0, nullptr) != 0) { + LogPrintf("Failed to add socket to kqueue fd (kevent returned error %s)\n", + NetworkErrorString(WSAGetLastError())); + return false; + } +#else + assert(false); +#endif /* USE_KQUEUE */ + } else { + assert(false); + } + return true; +} + +bool EdgeTriggeredEvents::RemoveSocket(SOCKET socket) const +{ + assert(m_valid); + + if (m_mode == SocketEventsMode::EPoll) { +#ifdef USE_EPOLL + if (epoll_ctl(m_fd, EPOLL_CTL_DEL, socket, nullptr) != 0) { + LogPrintf("Failed to remove socket from epoll fd (epoll_ctl returned error %s)\n", + NetworkErrorString(WSAGetLastError())); + return false; + } +#else + assert(false); +#endif /* USE_EPOLL */ + } else if (m_mode == SocketEventsMode::KQueue) { +#ifdef USE_KQUEUE + struct kevent event; + EV_SET(&event, socket, EVFILT_READ, EV_DELETE, 0, 0, nullptr); + if (kevent(m_fd, &event, 1, nullptr, 0, nullptr) != 0) { + LogPrintf("Failed to remove socket from kqueue fd (kevent returned error %s)\n", + NetworkErrorString(WSAGetLastError())); + return false; + } +#else + assert(false); +#endif /* USE_KQUEUE */ + } else { + assert(false); + } + return true; +} diff --git a/src/util/edge.h b/src/util/edge.h index c4d38c3922db2..37adb9e68bcb9 100644 --- a/src/util/edge.h +++ b/src/util/edge.h @@ -5,6 +5,8 @@ #ifndef BITCOIN_UTIL_EDGE_H #define BITCOIN_UTIL_EDGE_H +#include + #include enum class SocketEventsMode : int8_t; @@ -19,6 +21,11 @@ class EdgeTriggeredEvents explicit EdgeTriggeredEvents(SocketEventsMode events_mode); ~EdgeTriggeredEvents(); + /* Add socket to interest list */ + bool AddSocket(SOCKET socket) const; + /* Remove socket from interest list */ + bool RemoveSocket(SOCKET socket) const; + bool IsValid() const { return m_valid; } public: From f50c710028d708bfd6d722388da334d025ab2a3a Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Thu, 9 May 2024 13:57:58 +0000 Subject: [PATCH 4/9] refactor: move `CConnman::`(`Un`)`registerEvents` to ETE --- src/net.cpp | 91 ++++++++--------------------------------------- src/net.h | 3 -- src/util/edge.cpp | 77 +++++++++++++++++++++++++++++++++++++++ src/util/edge.h | 5 +++ 4 files changed, 97 insertions(+), 79 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index fae20387a4968..e96e45cfb5291 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -563,7 +563,9 @@ void CNode::CloseSocketDisconnect(CConnman* connman) } } - connman->UnregisterEvents(this); + if (connman->m_edge_trig_events && !connman->m_edge_trig_events->UnregisterEvents(hSocket)) { + LogPrint(BCLog::NET, "EdgeTriggeredEvents::UnregisterEvents() failed\n"); + } LogPrint(BCLog::NET, "disconnecting peer=%d\n", id); CloseSocket(hSocket); @@ -1276,7 +1278,12 @@ void CConnman::CreateNodeFromAcceptedSocket(SOCKET hSocket, LOCK(m_nodes_mutex); m_nodes.push_back(pnode); WITH_LOCK(cs_mapSocketToNode, mapSocketToNode.emplace(hSocket, pnode)); - RegisterEvents(pnode); + if (m_edge_trig_events) { + LOCK(pnode->cs_hSocket); + if (!m_edge_trig_events->RegisterEvents(pnode->hSocket)) { + LogPrint(BCLog::NET, "EdgeTriggeredEvents::RegisterEvents() failed\n"); + } + } WakeSelect(); } @@ -2980,7 +2987,12 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai { LOCK(m_nodes_mutex); m_nodes.push_back(pnode); - RegisterEvents(pnode); + if (m_edge_trig_events) { + LOCK(pnode->cs_hSocket); + if (!m_edge_trig_events->RegisterEvents(pnode->hSocket)) { + LogPrint(BCLog::NET, "EdgeTriggeredEvents::RegisterEvents() failed\n"); + } + } WakeSelect(); } } @@ -4193,79 +4205,6 @@ uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& ad) const return GetDeterministicRandomizer(RANDOMIZER_ID_NETGROUP).Write(vchNetGroup.data(), vchNetGroup.size()).Finalize(); } -void CConnman::RegisterEvents(CNode *pnode) -{ -#ifdef USE_KQUEUE - if (socketEventsMode == SocketEventsMode::KQueue) { - LOCK(pnode->cs_hSocket); - assert(pnode->hSocket != INVALID_SOCKET); - - struct kevent events[2]; - EV_SET(&events[0], pnode->hSocket, EVFILT_READ, EV_ADD, 0, 0, nullptr); - EV_SET(&events[1], pnode->hSocket, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, nullptr); - - int r = kevent(Assert(m_edge_trig_events)->m_fd, events, 2, nullptr, 0, nullptr); - if (r != 0) { - LogPrint(BCLog::NET, "%s -- kevent(%d, %d, %d, ...) failed. error: %s\n", __func__, - m_edge_trig_events->m_fd, EV_ADD, pnode->hSocket, NetworkErrorString(WSAGetLastError())); - } - } -#endif -#ifdef USE_EPOLL - if (socketEventsMode == SocketEventsMode::EPoll) { - LOCK(pnode->cs_hSocket); - assert(pnode->hSocket != INVALID_SOCKET); - - epoll_event e; - // We're using edge-triggered mode, so it's important that we drain sockets even if no signals come in - e.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP; - e.data.fd = pnode->hSocket; - - int r = epoll_ctl(Assert(m_edge_trig_events)->m_fd, EPOLL_CTL_ADD, pnode->hSocket, &e); - if (r != 0) { - LogPrint(BCLog::NET, "%s -- epoll_ctl(%d, %d, %d, ...) failed. error: %s\n", __func__, - m_edge_trig_events->m_fd, EPOLL_CTL_ADD, pnode->hSocket, NetworkErrorString(WSAGetLastError())); - } - } -#endif -} - -void CConnman::UnregisterEvents(CNode *pnode) -{ -#ifdef USE_KQUEUE - if (socketEventsMode == SocketEventsMode::KQueue) { - AssertLockHeld(pnode->cs_hSocket); - if (pnode->hSocket == INVALID_SOCKET) { - return; - } - - struct kevent events[2]; - EV_SET(&events[0], pnode->hSocket, EVFILT_READ, EV_DELETE, 0, 0, nullptr); - EV_SET(&events[1], pnode->hSocket, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr); - - int r = kevent(Assert(m_edge_trig_events)->m_fd, events, 2, nullptr, 0, nullptr); - if (r != 0) { - LogPrint(BCLog::NET, "%s -- kevent(%d, %d, %d, ...) failed. error: %s\n", __func__, - m_edge_trig_events->m_fd, EV_DELETE, pnode->hSocket, NetworkErrorString(WSAGetLastError())); - } - } -#endif -#ifdef USE_EPOLL - if (socketEventsMode == SocketEventsMode::EPoll) { - AssertLockHeld(pnode->cs_hSocket); - if (pnode->hSocket == INVALID_SOCKET) { - return; - } - - int r = epoll_ctl(Assert(m_edge_trig_events)->m_fd, EPOLL_CTL_DEL, pnode->hSocket, nullptr); - if (r != 0) { - LogPrint(BCLog::NET, "%s -- epoll_ctl(%d, %d, %d, ...) failed. error: %s\n", __func__, - m_edge_trig_events->m_fd, EPOLL_CTL_DEL, pnode->hSocket, NetworkErrorString(WSAGetLastError())); - } - } -#endif -} - void CaptureMessageToFile(const CAddress& addr, const std::string& msg_type, Span data, diff --git a/src/net.h b/src/net.h index fc4bac01ec809..3c0a3f7e8d599 100644 --- a/src/net.h +++ b/src/net.h @@ -1371,9 +1371,6 @@ friend class CNode; // Whether the node should be passed out in ForEach* callbacks static bool NodeFullyConnected(const CNode* pnode); - void RegisterEvents(CNode* pnode); - void UnregisterEvents(CNode* pnode); - // Network usage totals mutable Mutex m_total_bytes_sent_mutex; std::atomic nTotalBytesRecv{0}; diff --git a/src/util/edge.cpp b/src/util/edge.cpp index bcdaa607b4e8f..8d15eee04e714 100644 --- a/src/util/edge.cpp +++ b/src/util/edge.cpp @@ -125,3 +125,80 @@ bool EdgeTriggeredEvents::RemoveSocket(SOCKET socket) const } return true; } + +bool EdgeTriggeredEvents::RegisterEvents(SOCKET socket) const +{ + assert(m_valid && socket != INVALID_SOCKET); + + if (m_mode == SocketEventsMode::EPoll) { +#ifdef USE_EPOLL + epoll_event e; + // We're using edge-triggered mode, so it's important that we drain sockets even if no signals come in + e.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP; + e.data.fd = socket; + + if (epoll_ctl(m_fd, EPOLL_CTL_ADD, socket, &e) != 0) { + LogPrintf("Failed to register events for socket -- epoll_ctl(%d, %d, %d, ...) returned error: %s\n", + m_fd, EPOLL_CTL_ADD, socket, NetworkErrorString(WSAGetLastError())); + return false; + } +#else + assert(false); +#endif /* USE_EPOLL */ + } else if (m_mode == SocketEventsMode::KQueue) { +#ifdef USE_KQUEUE + struct kevent events[2]; + EV_SET(&events[0], socket, EVFILT_READ, EV_ADD, 0, 0, nullptr); + EV_SET(&events[1], socket, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, nullptr); + + if (kevent(m_fd, events, 2, nullptr, 0, nullptr) != 0) { + LogPrintf("Failed to register events for socket -- kevent(%d, %d, %d, ...) returned error: %s\n", + m_fd, EV_ADD, socket, NetworkErrorString(WSAGetLastError())); + return false; + } +#else + assert(false); +#endif /* USE_KQUEUE */ + } else { + assert(false); + } + return true; +} + +bool EdgeTriggeredEvents::UnregisterEvents(SOCKET socket) const +{ + assert(m_valid); + + if (socket == INVALID_SOCKET) { + LogPrintf("Cannot unregister events for invalid socket\n"); + return false; + } + + if (m_mode == SocketEventsMode::EPoll) { +#ifdef USE_EPOLL + if (epoll_ctl(m_fd, EPOLL_CTL_DEL, socket, nullptr) != 0) { + LogPrintf("Failed to unregister events for socket -- epoll_ctl(%d, %d, %d, ...) returned error: %s\n", + m_fd, EPOLL_CTL_DEL, socket, NetworkErrorString(WSAGetLastError())); + return false; + } +#else + assert(false); +#endif /* USE_EPOLL */ + } else if (m_mode == SocketEventsMode::KQueue) { +#ifdef USE_KQUEUE + struct kevent events[2]; + EV_SET(&events[0], socket, EVFILT_READ, EV_DELETE, 0, 0, nullptr); + EV_SET(&events[1], socket, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr); + if (kevent(m_fd, events, 2, nullptr, 0, nullptr) != 0) { + LogPrintf("Failed to unregister events for socket -- kevent(%d, %d, %d, ...) returned error: %s\n", + m_fd, EV_DELETE, socket, NetworkErrorString(WSAGetLastError())); + return false; + } +#else + assert(false); +#endif /* USE_KQUEUE */ + } else { + assert(false); + } + return true; +} diff --git a/src/util/edge.h b/src/util/edge.h index 37adb9e68bcb9..fb8415b39ae75 100644 --- a/src/util/edge.h +++ b/src/util/edge.h @@ -28,6 +28,11 @@ class EdgeTriggeredEvents bool IsValid() const { return m_valid; } + /* Register events for socket */ + bool RegisterEvents(SOCKET socket) const; + /* Unregister events for socket */ + bool UnregisterEvents(SOCKET socket) const; + public: /* File descriptor used to interact with events mode */ int m_fd{-1}; From ed7d976c3e62a09d1d32c9092065e7090259d421 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Fri, 10 May 2024 18:16:51 +0000 Subject: [PATCH 5/9] refactor: move wakeup pipe (de)registration to ETE --- src/net.cpp | 44 ++++++-------------------------------- src/util/edge.cpp | 54 +++++++++++++++++++++++++++++++++++++---------- src/util/edge.h | 14 +++++++++++- 3 files changed, 62 insertions(+), 50 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index e96e45cfb5291..34cc388924660 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -3385,31 +3385,9 @@ bool CConnman::Start(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_met if (fcntl(wakeupPipe[1], F_SETFL, fFlags | O_NONBLOCK) == -1) { LogPrint(BCLog::NET, "fcntl for O_NONBLOCK on wakeupPipe failed\n"); } -#ifdef USE_KQUEUE - if (socketEventsMode == SocketEventsMode::KQueue) { - struct kevent event; - EV_SET(&event, wakeupPipe[0], EVFILT_READ, EV_ADD, 0, 0, nullptr); - int r = kevent(Assert(m_edge_trig_events)->m_fd, &event, 1, nullptr, 0, nullptr); - if (r != 0) { - LogPrint(BCLog::NET, "%s -- kevent(%d, %d, %d, ...) failed. error: %s\n", __func__, - m_edge_trig_events->m_fd, EV_ADD, wakeupPipe[0], NetworkErrorString(WSAGetLastError())); - return false; - } + if (m_edge_trig_events && !m_edge_trig_events->RegisterPipe(wakeupPipe[0])) { + LogPrint(BCLog::NET, "EdgeTriggeredEvents::RegisterPipe() failed\n"); } -#endif -#ifdef USE_EPOLL - if (socketEventsMode == SocketEventsMode::EPoll) { - epoll_event event; - event.events = EPOLLIN; - event.data.fd = wakeupPipe[0]; - int r = epoll_ctl(Assert(m_edge_trig_events)->m_fd, EPOLL_CTL_ADD, wakeupPipe[0], &event); - if (r != 0) { - LogPrint(BCLog::NET, "%s -- epoll_ctl(%d, %d, %d, ...) failed. error: %s\n", __func__, - m_edge_trig_events->m_fd, EPOLL_CTL_ADD, wakeupPipe[0], NetworkErrorString(WSAGetLastError())); - return false; - } - } -#endif } #endif @@ -3578,24 +3556,14 @@ void CConnman::StopNodes() semOutbound.reset(); semAddnode.reset(); -#ifdef USE_KQUEUE - if (socketEventsMode == SocketEventsMode::KQueue && Assert(m_edge_trig_events)->m_fd != -1) { + if (m_edge_trig_events) { #ifdef USE_WAKEUP_PIPE - struct kevent event; - EV_SET(&event, wakeupPipe[0], EVFILT_READ, EV_DELETE, 0, 0, nullptr); - kevent(m_edge_trig_events->m_fd, &event, 1, nullptr, 0, nullptr); -#endif - m_edge_trig_events.reset(); - } -#endif -#ifdef USE_EPOLL - if (socketEventsMode == SocketEventsMode::EPoll && Assert(m_edge_trig_events)->m_fd != -1) { -#ifdef USE_WAKEUP_PIPE - epoll_ctl(m_edge_trig_events->m_fd, EPOLL_CTL_DEL, wakeupPipe[0], nullptr); + if (!m_edge_trig_events->UnregisterPipe(wakeupPipe[0])) { + LogPrintf("EdgeTriggeredEvents::UnregisterPipe() failed\n"); + } #endif m_edge_trig_events.reset(); } -#endif #ifdef USE_WAKEUP_PIPE if (wakeupPipe[0] != -1) close(wakeupPipe[0]); diff --git a/src/util/edge.cpp b/src/util/edge.cpp index 8d15eee04e714..a8b464040e9c0 100644 --- a/src/util/edge.cpp +++ b/src/util/edge.cpp @@ -59,17 +59,17 @@ EdgeTriggeredEvents::~EdgeTriggeredEvents() } } -bool EdgeTriggeredEvents::AddSocket(SOCKET socket) const +bool EdgeTriggeredEvents::RegisterEntity(int entity, std::string entity_name) const { assert(m_valid); if (m_mode == SocketEventsMode::EPoll) { #ifdef USE_EPOLL epoll_event event; - event.data.fd = socket; + event.data.fd = entity; event.events = EPOLLIN; - if (epoll_ctl(m_fd, EPOLL_CTL_ADD, socket, &event) != 0) { - LogPrintf("Failed to add socket to epoll fd (epoll_ctl returned error %s)\n", + if (epoll_ctl(m_fd, EPOLL_CTL_ADD, entity, &event) != 0) { + LogPrintf("Failed to add %s to epoll fd (epoll_ctl returned error %s)\n", entity_name, NetworkErrorString(WSAGetLastError())); return false; } @@ -79,9 +79,9 @@ bool EdgeTriggeredEvents::AddSocket(SOCKET socket) const } else if (m_mode == SocketEventsMode::KQueue) { #ifdef USE_KQUEUE struct kevent event; - EV_SET(&event, socket, EVFILT_READ, EV_ADD, 0, 0, nullptr); + EV_SET(&event, entity, EVFILT_READ, EV_ADD, 0, 0, nullptr); if (kevent(m_fd, &event, 1, nullptr, 0, nullptr) != 0) { - LogPrintf("Failed to add socket to kqueue fd (kevent returned error %s)\n", + LogPrintf("Failed to add %s to kqueue fd (kevent returned error %s)\n", entity_name, NetworkErrorString(WSAGetLastError())); return false; } @@ -94,14 +94,14 @@ bool EdgeTriggeredEvents::AddSocket(SOCKET socket) const return true; } -bool EdgeTriggeredEvents::RemoveSocket(SOCKET socket) const +bool EdgeTriggeredEvents::UnregisterEntity(int entity, std::string entity_name) const { assert(m_valid); if (m_mode == SocketEventsMode::EPoll) { #ifdef USE_EPOLL - if (epoll_ctl(m_fd, EPOLL_CTL_DEL, socket, nullptr) != 0) { - LogPrintf("Failed to remove socket from epoll fd (epoll_ctl returned error %s)\n", + if (epoll_ctl(m_fd, EPOLL_CTL_DEL, entity, nullptr) != 0) { + LogPrintf("Failed to remove %s from epoll fd (epoll_ctl returned error %s)\n", entity_name, NetworkErrorString(WSAGetLastError())); return false; } @@ -111,9 +111,9 @@ bool EdgeTriggeredEvents::RemoveSocket(SOCKET socket) const } else if (m_mode == SocketEventsMode::KQueue) { #ifdef USE_KQUEUE struct kevent event; - EV_SET(&event, socket, EVFILT_READ, EV_DELETE, 0, 0, nullptr); + EV_SET(&event, entity, EVFILT_READ, EV_DELETE, 0, 0, nullptr); if (kevent(m_fd, &event, 1, nullptr, 0, nullptr) != 0) { - LogPrintf("Failed to remove socket from kqueue fd (kevent returned error %s)\n", + LogPrintf("Failed to remove %s from kqueue fd (kevent returned error %s)\n", entity_name, NetworkErrorString(WSAGetLastError())); return false; } @@ -126,6 +126,38 @@ bool EdgeTriggeredEvents::RemoveSocket(SOCKET socket) const return true; } +bool EdgeTriggeredEvents::AddSocket(SOCKET socket) const +{ + return RegisterEntity(socket, "socket"); +} + +bool EdgeTriggeredEvents::RemoveSocket(SOCKET socket) const +{ + return UnregisterEntity(socket, "socket"); +} + +bool EdgeTriggeredEvents::RegisterPipe(int wakeup_pipe) +{ + if (m_pipe_registered) { + LogPrintf("Pipe already registered, ignoring new registration request\n"); + return false; + } + bool ret = RegisterEntity(wakeup_pipe, "wakeup pipe"); + if (ret) m_pipe_registered = true; + return ret; +} + +bool EdgeTriggeredEvents::UnregisterPipe(int wakeup_pipe) +{ + if (!m_pipe_registered) { + LogPrintf("No pipe currently registered to unregister, ignoring request\n"); + return false; + } + bool ret = UnregisterEntity(wakeup_pipe, "wakeup pipe"); + if (ret) m_pipe_registered = false; + return ret; +} + bool EdgeTriggeredEvents::RegisterEvents(SOCKET socket) const { assert(m_valid && socket != INVALID_SOCKET); diff --git a/src/util/edge.h b/src/util/edge.h index fb8415b39ae75..d75ef4d3092c4 100644 --- a/src/util/edge.h +++ b/src/util/edge.h @@ -8,6 +8,7 @@ #include #include +#include enum class SocketEventsMode : int8_t; @@ -21,23 +22,34 @@ class EdgeTriggeredEvents explicit EdgeTriggeredEvents(SocketEventsMode events_mode); ~EdgeTriggeredEvents(); + bool IsValid() const { return m_valid; } + /* Add socket to interest list */ bool AddSocket(SOCKET socket) const; /* Remove socket from interest list */ bool RemoveSocket(SOCKET socket) const; - bool IsValid() const { return m_valid; } + /* Register wakeup pipe with EdgeTriggeredEvents instance */ + bool RegisterPipe(int wakeup_pipe); + /* Unregister wakeup pipe with EdgeTriggeredEvents instance */ + bool UnregisterPipe(int wakeup_pipe); /* Register events for socket */ bool RegisterEvents(SOCKET socket) const; /* Unregister events for socket */ bool UnregisterEvents(SOCKET socket) const; +private: + bool RegisterEntity(int entity, std::string entity_name) const; + bool UnregisterEntity(int entity, std::string entity_name) const; + public: /* File descriptor used to interact with events mode */ int m_fd{-1}; private: + /* Flag set if pipe has been registered with instance */ + bool m_pipe_registered{false}; /* Instance validity flag set during construction */ bool m_valid{false}; /* Flag for storing selected socket events mode */ From b8c3b480eb742773d629309ba6525d4735eacd2b Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Fri, 10 May 2024 18:19:03 +0000 Subject: [PATCH 6/9] refactor: introduce `WakeupPipe`, move wakeup select pipe logic there --- src/Makefile.am | 2 + src/net.cpp | 124 +++++++++++++++------------------------------ src/net.h | 23 +++++---- src/util/wpipe.cpp | 79 +++++++++++++++++++++++++++++ src/util/wpipe.h | 59 +++++++++++++++++++++ 5 files changed, 193 insertions(+), 94 deletions(-) create mode 100644 src/util/wpipe.cpp create mode 100644 src/util/wpipe.h diff --git a/src/Makefile.am b/src/Makefile.am index dadf9224ffa4c..03e24588136cb 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -363,6 +363,7 @@ BITCOIN_CORE_H = \ util/ui_change_type.h \ util/url.h \ util/vector.h \ + util/wpipe.h \ validation.h \ validationinterface.h \ versionbits.h \ @@ -797,6 +798,7 @@ libbitcoin_util_a_SOURCES = \ util/thread.cpp \ util/threadnames.cpp \ util/tokenpipe.cpp \ + util/wpipe.cpp \ $(BITCOIN_CORE_H) if USE_LIBEVENT diff --git a/src/net.cpp b/src/net.cpp index 34cc388924660..785ff0095bbf6 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include // for fDIP0001ActiveAtTip #include @@ -119,7 +120,7 @@ static const uint64_t SELECT_TIMEOUT_MILLISECONDS = 50; // We are however still somewhat limited in how long we can sleep as there is periodic work (cleanup) to be done in // the socket handler thread static const uint64_t SELECT_TIMEOUT_MILLISECONDS = 500; -#endif +#endif /* USE_WAKEUP_PIPE */ const std::string NET_MESSAGE_COMMAND_OTHER = "*other*"; @@ -1284,7 +1285,9 @@ void CConnman::CreateNodeFromAcceptedSocket(SOCKET hSocket, LogPrint(BCLog::NET, "EdgeTriggeredEvents::RegisterEvents() failed\n"); } } - WakeSelect(); + if (m_wakeup_pipe) { + m_wakeup_pipe->Write(); + } } // We received a new connection, harvest entropy from the time (and our peer count) @@ -1569,14 +1572,14 @@ bool CConnman::GenerateSelectSet(const std::vector& nodes, } } -#ifdef USE_WAKEUP_PIPE - // We add a pipe to the read set so that the select() call can be woken up from the outside - // This is done when data is added to send buffers (vSendMsg) or when new peers are added - // This is currently only implemented for POSIX compliant systems. This means that Windows will fall back to - // timing out after 50ms and then trying to send. This is ok as we assume that heavy-load daemons are usually - // run on Linux and friends. - recv_set.insert(wakeupPipe[0]); -#endif + if (m_wakeup_pipe) { + // We add a pipe to the read set so that the select() call can be woken up from the outside + // This is done when data is added to send buffers (vSendMsg) or when new peers are added + // This is currently only implemented for POSIX compliant systems. This means that Windows will fall back to + // timing out after 50ms and then trying to send. This is ok as we assume that heavy-load daemons are usually + // run on Linux and friends. + recv_set.insert(m_wakeup_pipe->m_pipe[0]); + } return !recv_set.empty() || !send_set.empty() || !error_set.empty(); } @@ -1594,9 +1597,8 @@ void CConnman::SocketEventsKqueue(std::set& recv_set, timeout.tv_sec = only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS / 1000; timeout.tv_nsec = (only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS % 1000) * 1000 * 1000; - wakeupSelectNeeded = true; - int n = kevent(Assert(m_edge_trig_events)->m_fd, nullptr, 0, events, maxEvents, &timeout); - wakeupSelectNeeded = false; + int n{-1}; + ToggleWakeupPipe([&](){n = kevent(Assert(m_edge_trig_events)->m_fd, nullptr, 0, events, maxEvents, &timeout);}); if (n == -1) { LogPrintf("kevent wait error\n"); } else if (n > 0) { @@ -1628,9 +1630,8 @@ void CConnman::SocketEventsEpoll(std::set& recv_set, const size_t maxEvents = 64; epoll_event events[maxEvents]; - wakeupSelectNeeded = true; - int n = epoll_wait(Assert(m_edge_trig_events)->m_fd, events, maxEvents, only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS); - wakeupSelectNeeded = false; + int n{-1}; + ToggleWakeupPipe([&](){n = epoll_wait(Assert(m_edge_trig_events)->m_fd, events, maxEvents, only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);}); for (int i = 0; i < n; i++) { auto& e = events[i]; if((e.events & EPOLLERR) || (e.events & EPOLLHUP)) { @@ -1685,9 +1686,8 @@ void CConnman::SocketEventsPoll(const std::vector& nodes, vpollfds.push_back(std::move(it.second)); } - wakeupSelectNeeded = true; - int r = poll(vpollfds.data(), vpollfds.size(), only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS); - wakeupSelectNeeded = false; + int r{-1}; + ToggleWakeupPipe([&](){r = poll(vpollfds.data(), vpollfds.size(), only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);}); if (r < 0) { return; } @@ -1744,9 +1744,8 @@ void CConnman::SocketEventsSelect(const std::vector& nodes, hSocketMax = std::max(hSocketMax, hSocket); } - wakeupSelectNeeded = true; - int nSelect = select(hSocketMax + 1, &fdsetRecv, &fdsetSend, &fdsetError, &timeout); - wakeupSelectNeeded = false; + int nSelect{-1}; + ToggleWakeupPipe([&](){nSelect = select(hSocketMax + 1, &fdsetRecv, &fdsetSend, &fdsetError, &timeout);}); if (interruptNet) return; @@ -1849,18 +1848,10 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync) // empty sets. SocketEvents(snap.Nodes(), recv_set, send_set, error_set, only_poll); -#ifdef USE_WAKEUP_PIPE - // drain the wakeup pipe - if (recv_set.count(wakeupPipe[0])) { - char buf[128]; - while (true) { - int r = read(wakeupPipe[0], buf, sizeof(buf)); - if (r <= 0) { - break; - } - } + // Drain the wakeup pipe + if (m_wakeup_pipe && recv_set.count(m_wakeup_pipe->m_pipe[0])) { + m_wakeup_pipe->Drain(); } -#endif // Service (send/receive) each of the already connected nodes. SocketHandlerConnected(recv_set, send_set, error_set); @@ -2138,22 +2129,6 @@ void CConnman::WakeMessageHandler() condMsgProc.notify_one(); } -void CConnman::WakeSelect() -{ -#ifdef USE_WAKEUP_PIPE - if (wakeupPipe[1] == -1) { - return; - } - - char buf{0}; - if (write(wakeupPipe[1], &buf, sizeof(buf)) != 1) { - LogPrint(BCLog::NET, "write to wakeupPipe failed\n"); - } -#endif - - wakeupSelectNeeded = false; -} - void CConnman::ThreadDNSAddressSeed() { FastRandomContext rng; @@ -2993,7 +2968,9 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai LogPrint(BCLog::NET, "EdgeTriggeredEvents::RegisterEvents() failed\n"); } } - WakeSelect(); + if (m_wakeup_pipe) { + m_wakeup_pipe->Write(); + } } } @@ -3373,23 +3350,13 @@ bool CConnman::Start(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_met } #ifdef USE_WAKEUP_PIPE - if (pipe(wakeupPipe) != 0) { - wakeupPipe[0] = wakeupPipe[1] = -1; - LogPrint(BCLog::NET, "pipe() for wakeupPipe failed\n"); - } else { - int fFlags = fcntl(wakeupPipe[0], F_GETFL, 0); - if (fcntl(wakeupPipe[0], F_SETFL, fFlags | O_NONBLOCK) == -1) { - LogPrint(BCLog::NET, "fcntl for O_NONBLOCK on wakeupPipe failed\n"); - } - fFlags = fcntl(wakeupPipe[1], F_GETFL, 0); - if (fcntl(wakeupPipe[1], F_SETFL, fFlags | O_NONBLOCK) == -1) { - LogPrint(BCLog::NET, "fcntl for O_NONBLOCK on wakeupPipe failed\n"); - } - if (m_edge_trig_events && !m_edge_trig_events->RegisterPipe(wakeupPipe[0])) { - LogPrint(BCLog::NET, "EdgeTriggeredEvents::RegisterPipe() failed\n"); - } + m_wakeup_pipe = std::make_unique(m_edge_trig_events.get()); + if (!m_wakeup_pipe->IsValid()) { + /* We log the error but do not halt initialization */ + LogPrintf("Unable to initialize WakeupPipe instance\n"); + m_wakeup_pipe.reset(); } -#endif +#endif /* USE_WAKEUP_PIPE */ // Send and receive from sockets, accept connections threadSocketHandler = std::thread(&util::TraceThread, "net", [this, &mn_sync] { ThreadSocketHandler(mn_sync); }); @@ -3555,21 +3522,12 @@ void CConnman::StopNodes() vhListenSocket.clear(); semOutbound.reset(); semAddnode.reset(); - - if (m_edge_trig_events) { -#ifdef USE_WAKEUP_PIPE - if (!m_edge_trig_events->UnregisterPipe(wakeupPipe[0])) { - LogPrintf("EdgeTriggeredEvents::UnregisterPipe() failed\n"); - } -#endif - m_edge_trig_events.reset(); - } - -#ifdef USE_WAKEUP_PIPE - if (wakeupPipe[0] != -1) close(wakeupPipe[0]); - if (wakeupPipe[1] != -1) close(wakeupPipe[1]); - wakeupPipe[0] = wakeupPipe[1] = -1; -#endif + /** + * m_wakeup_pipe must be reset *before* m_edge_trig_events as it may + * attempt to call EdgeTriggeredEvents::UnregisterPipe() in its destructor + */ + m_wakeup_pipe.reset(); + m_edge_trig_events.reset(); } void CConnman::DeleteNode(CNode* pnode) @@ -4082,8 +4040,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) } // wake up select() call in case there was no pending data before (so it was not selecting this socket for sending) - if (!hasPendingData && wakeupSelectNeeded) - WakeSelect(); + if (!hasPendingData && (m_wakeup_pipe && m_wakeup_pipe->m_need_wakeup.load())) + m_wakeup_pipe->Write(); } } diff --git a/src/net.h b/src/net.h index 3c0a3f7e8d599..4e40817027397 100644 --- a/src/net.h +++ b/src/net.h @@ -31,6 +31,7 @@ #include #include #include +#include #include #include @@ -45,10 +46,6 @@ #include #include -#ifndef WIN32 -#define USE_WAKEUP_PIPE -#endif - class CConnman; class CDeterministicMNList; class CDeterministicMNManager; @@ -1168,7 +1165,6 @@ friend class CNode; unsigned int GetReceiveFloodSize() const; void WakeMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); - void WakeSelect() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); /** Attempts to obfuscate tx time through exponentially distributed emitting. Works assuming that a single interval is used. @@ -1505,14 +1501,19 @@ friend class CNode; */ std::unique_ptr m_i2p_sam_session; -#ifdef USE_WAKEUP_PIPE - /** a pipe which is added to select() calls to wakeup before the timeout */ - int wakeupPipe[2]{-1,-1}; -#endif - std::atomic wakeupSelectNeeded{false}; - SocketEventsMode socketEventsMode; std::unique_ptr m_edge_trig_events{nullptr}; + std::unique_ptr m_wakeup_pipe{nullptr}; + + template + void ToggleWakeupPipe(Callable&& func) + { + if (m_wakeup_pipe) { + m_wakeup_pipe->Toggle(func); + } else { + func(); + } + } Mutex cs_sendable_receivable_nodes; std::unordered_map mapReceivableNodes GUARDED_BY(cs_sendable_receivable_nodes); diff --git a/src/util/wpipe.cpp b/src/util/wpipe.cpp new file mode 100644 index 0000000000000..01153395e6cc4 --- /dev/null +++ b/src/util/wpipe.cpp @@ -0,0 +1,79 @@ +// Copyright (c) 2020-2024 The Dash Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include + +#include +#include + +WakeupPipe::WakeupPipe(EdgeTriggeredEvents* edge_trig_events) + : m_edge_trig_events{edge_trig_events} +{ +#ifdef USE_WAKEUP_PIPE + if (pipe(m_pipe.data()) != 0) { + LogPrintf("Unable to initialize WakeupPipe, pipe() for m_pipe failed\n"); + return; + } + for (size_t idx = 0; idx < m_pipe.size(); idx++) { + int flags = fcntl(m_pipe[idx], F_GETFL, 0); + if (fcntl(m_pipe[idx], F_SETFL, flags | O_NONBLOCK) == -1) { + LogPrintf("Unable to initialize WakeupPipe, fcntl for O_NONBLOCK on m_pipe[%d] failed\n", idx); + return; + } + } + if (edge_trig_events && !edge_trig_events->RegisterPipe(m_pipe[0])) { + LogPrintf("Unable to initialize WakeupPipe, EdgeTriggeredEvents::RegisterPipe() failed\n"); + return; + } + m_valid = true; +#else + LogPrintf("Attempting to initialize WakeupPipe without support compiled in!\n"); +#endif /* USE_WAKEUP_PIPE */ +} + +WakeupPipe::~WakeupPipe() +{ + if (m_valid) { +#ifdef USE_WAKEUP_PIPE + if (m_edge_trig_events && !m_edge_trig_events->UnregisterPipe(m_pipe[0])) { + LogPrintf("Destroying WakeupPipe instance, EdgeTriggeredEvents::UnregisterPipe() failed\n"); + } + close(m_pipe[0]); + close(m_pipe[1]); +#else + assert(false); +#endif /* USE_WAKEUP_PIPE */ + } +} + +void WakeupPipe::Drain() const +{ +#ifdef USE_WAKEUP_PIPE + assert(m_valid && m_pipe[0] != -1); + + int ret{0}; + std::array buf; + do { + ret = read(m_pipe[0], buf.data(), buf.size()); + } while (ret > 0); +#else + assert(false); +#endif /* USE_WAKEUP_PIPE */ +} + +void WakeupPipe::Write() +{ +#ifdef USE_WAKEUP_PIPE + assert(m_valid && m_pipe[1] != -1); + + std::array buf; + if (write(m_pipe[1], buf.data(), buf.size()) != 1) { + LogPrintf("Write to m_pipe[1] failed\n"); + } + + m_need_wakeup = false; +#else + assert(false); +#endif /* USE_WAKEUP_PIPE */ +} diff --git a/src/util/wpipe.h b/src/util/wpipe.h new file mode 100644 index 0000000000000..c4fff558ecb53 --- /dev/null +++ b/src/util/wpipe.h @@ -0,0 +1,59 @@ +// Copyright (c) 2020-2024 The Dash Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_UTIL_WPIPE_H +#define BITCOIN_UTIL_WPIPE_H + +#include +#include +#include + +#ifndef WIN32 +#define USE_WAKEUP_PIPE +#endif + +class EdgeTriggeredEvents; + +/** + * A manager for abstracting logic surrounding wakeup pipes. Supported only on + * platforms with a POSIX API. Disabled on Windows. + */ +class WakeupPipe +{ +public: + explicit WakeupPipe(EdgeTriggeredEvents* edge_trig_events); + ~WakeupPipe(); + + bool IsValid() const { return m_valid; }; + + /* Drain pipe of all contents */ + void Drain() const; + /* Write a byte to the pipe */ + void Write(); + + /* Used to wrap calls around m_need_wakeup toggling */ + template + void Toggle(Callable&& func) + { + assert(m_valid); + + m_need_wakeup = true; + func(); + m_need_wakeup = false; + } + +public: + /* File descriptors for read and write data channels */ + std::array m_pipe{{ -1, -1 }}; + /* Flag used to determine if Write() needs to be called. Used occasionally */ + std::atomic m_need_wakeup{false}; + +private: + /* Instance validity flag set during construction */ + bool m_valid{false}; + /* Pointer to EdgeTriggeredEvents instance used for pipe (de)registration if using supported events modes */ + EdgeTriggeredEvents* m_edge_trig_events{nullptr}; +}; + +#endif /* BITCOIN_UTIL_WPIPE_H */ From f24520a3a2551386c74e463d7a2c460b10227b4a Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Fri, 10 May 2024 18:19:35 +0000 Subject: [PATCH 7/9] net: log `close` failures in `EdgeTriggerEvents` and `WakeupPipe` --- src/util/edge.cpp | 5 ++++- src/util/wpipe.cpp | 9 +++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/util/edge.cpp b/src/util/edge.cpp index a8b464040e9c0..285fc3080301a 100644 --- a/src/util/edge.cpp +++ b/src/util/edge.cpp @@ -52,7 +52,10 @@ EdgeTriggeredEvents::~EdgeTriggeredEvents() { if (m_valid) { #if defined(USE_KQUEUE) || defined(USE_EPOLL) - close(m_fd); + if (close(m_fd) != 0) { + LogPrintf("Destroying EdgeTriggeredEvents instance, close() failed for m_fd = %d with error %s\n", m_fd, + NetworkErrorString(WSAGetLastError())); + } #else assert(false); #endif /* defined(USE_KQUEUE) || defined(USE_EPOLL) */ diff --git a/src/util/wpipe.cpp b/src/util/wpipe.cpp index 01153395e6cc4..f4210d6703922 100644 --- a/src/util/wpipe.cpp +++ b/src/util/wpipe.cpp @@ -6,6 +6,7 @@ #include #include +#include WakeupPipe::WakeupPipe(EdgeTriggeredEvents* edge_trig_events) : m_edge_trig_events{edge_trig_events} @@ -39,8 +40,12 @@ WakeupPipe::~WakeupPipe() if (m_edge_trig_events && !m_edge_trig_events->UnregisterPipe(m_pipe[0])) { LogPrintf("Destroying WakeupPipe instance, EdgeTriggeredEvents::UnregisterPipe() failed\n"); } - close(m_pipe[0]); - close(m_pipe[1]); + for (size_t idx = 0; idx < m_pipe.size(); idx++) { + if (close(m_pipe[idx]) != 0) { + LogPrintf("Destroying WakeupPipe instance, close() failed for m_pipe[%d] = %d with error %s\n", + idx, m_pipe[idx], NetworkErrorString(WSAGetLastError())); + } + } #else assert(false); #endif /* USE_WAKEUP_PIPE */ From ec99294976dea247956899f9b0f9cd7b33ba9ada Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Sat, 4 May 2024 13:32:15 +0000 Subject: [PATCH 8/9] net: restrict access `EdgeTriggerEvents` members - File descriptor creation and destruction are handled within ETE, no reason to expose read-write access to it. - (Un)registerPipe should only be used within WakeupPipe, remove from public view. --- src/net.cpp | 4 ++-- src/util/edge.cpp | 2 -- src/util/edge.h | 20 +++++++++++--------- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 785ff0095bbf6..0426396257b2b 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1598,7 +1598,7 @@ void CConnman::SocketEventsKqueue(std::set& recv_set, timeout.tv_nsec = (only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS % 1000) * 1000 * 1000; int n{-1}; - ToggleWakeupPipe([&](){n = kevent(Assert(m_edge_trig_events)->m_fd, nullptr, 0, events, maxEvents, &timeout);}); + ToggleWakeupPipe([&](){n = kevent(Assert(m_edge_trig_events)->GetFileDescriptor(), nullptr, 0, events, maxEvents, &timeout);}); if (n == -1) { LogPrintf("kevent wait error\n"); } else if (n > 0) { @@ -1631,7 +1631,7 @@ void CConnman::SocketEventsEpoll(std::set& recv_set, epoll_event events[maxEvents]; int n{-1}; - ToggleWakeupPipe([&](){n = epoll_wait(Assert(m_edge_trig_events)->m_fd, events, maxEvents, only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);}); + ToggleWakeupPipe([&](){n = epoll_wait(Assert(m_edge_trig_events)->GetFileDescriptor(), events, maxEvents, only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);}); for (int i = 0; i < n; i++) { auto& e = events[i]; if((e.events & EPOLLERR) || (e.events & EPOLLHUP)) { diff --git a/src/util/edge.cpp b/src/util/edge.cpp index 285fc3080301a..482600c8c03ea 100644 --- a/src/util/edge.cpp +++ b/src/util/edge.cpp @@ -7,8 +7,6 @@ #include #include -#include - #ifdef USE_EPOLL #include #endif diff --git a/src/util/edge.h b/src/util/edge.h index d75ef4d3092c4..b605cf817c061 100644 --- a/src/util/edge.h +++ b/src/util/edge.h @@ -7,6 +7,7 @@ #include +#include #include #include @@ -23,30 +24,29 @@ class EdgeTriggeredEvents ~EdgeTriggeredEvents(); bool IsValid() const { return m_valid; } + int GetFileDescriptor() const { assert(m_fd != -1); return m_fd; } /* Add socket to interest list */ bool AddSocket(SOCKET socket) const; /* Remove socket from interest list */ bool RemoveSocket(SOCKET socket) const; - /* Register wakeup pipe with EdgeTriggeredEvents instance */ - bool RegisterPipe(int wakeup_pipe); - /* Unregister wakeup pipe with EdgeTriggeredEvents instance */ - bool UnregisterPipe(int wakeup_pipe); - /* Register events for socket */ bool RegisterEvents(SOCKET socket) const; /* Unregister events for socket */ bool UnregisterEvents(SOCKET socket) const; +private: + friend class WakeupPipe; + /* Register wakeup pipe with EdgeTriggeredEvents instance */ + bool RegisterPipe(int wakeup_pipe); + /* Unregister wakeup pipe with EdgeTriggeredEvents instance */ + bool UnregisterPipe(int wakeup_pipe); + private: bool RegisterEntity(int entity, std::string entity_name) const; bool UnregisterEntity(int entity, std::string entity_name) const; -public: - /* File descriptor used to interact with events mode */ - int m_fd{-1}; - private: /* Flag set if pipe has been registered with instance */ bool m_pipe_registered{false}; @@ -54,6 +54,8 @@ class EdgeTriggeredEvents bool m_valid{false}; /* Flag for storing selected socket events mode */ SocketEventsMode m_mode; + /* File descriptor used to interact with events mode */ + int m_fd{-1}; }; #endif /* BITCOIN_UTIL_EDGE_H */ From bd8b5d40073bcc54bd29663f06d965fafe349b08 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Sat, 4 May 2024 18:13:50 +0000 Subject: [PATCH 9/9] net: add more details to log information in ETE and `WakeupPipes` --- src/util/edge.cpp | 6 ++++-- src/util/wpipe.cpp | 25 ++++++++++++++++++------- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/src/util/edge.cpp b/src/util/edge.cpp index 482600c8c03ea..bb7f867127aec 100644 --- a/src/util/edge.cpp +++ b/src/util/edge.cpp @@ -22,7 +22,8 @@ EdgeTriggeredEvents::EdgeTriggeredEvents(SocketEventsMode events_mode) #ifdef USE_EPOLL m_fd = epoll_create1(0); if (m_fd == -1) { - LogPrintf("Unable to initialize EdgeTriggeredEvents, epoll_create1 returned -1\n"); + LogPrintf("Unable to initialize EdgeTriggeredEvents, epoll_create1 returned -1 with error %s\n", + NetworkErrorString(WSAGetLastError())); return; } #else @@ -33,7 +34,8 @@ EdgeTriggeredEvents::EdgeTriggeredEvents(SocketEventsMode events_mode) #ifdef USE_KQUEUE m_fd = kqueue(); if (m_fd == -1) { - LogPrintf("Unable to initialize EdgeTriggeredEvents, kqueue returned -1\n"); + LogPrintf("Unable to initialize EdgeTriggeredEvents, kqueue returned -1 with error %s\n", + NetworkErrorString(WSAGetLastError())); return; } #else diff --git a/src/util/wpipe.cpp b/src/util/wpipe.cpp index f4210d6703922..732e23c791e5e 100644 --- a/src/util/wpipe.cpp +++ b/src/util/wpipe.cpp @@ -8,23 +8,28 @@ #include #include +static constexpr int EXPECTED_PIPE_WRITTEN_BYTES = 1; + WakeupPipe::WakeupPipe(EdgeTriggeredEvents* edge_trig_events) : m_edge_trig_events{edge_trig_events} { #ifdef USE_WAKEUP_PIPE if (pipe(m_pipe.data()) != 0) { - LogPrintf("Unable to initialize WakeupPipe, pipe() for m_pipe failed\n"); + LogPrintf("Unable to initialize WakeupPipe, pipe() for m_pipe failed with error %s\n", + NetworkErrorString(WSAGetLastError())); return; } for (size_t idx = 0; idx < m_pipe.size(); idx++) { int flags = fcntl(m_pipe[idx], F_GETFL, 0); if (fcntl(m_pipe[idx], F_SETFL, flags | O_NONBLOCK) == -1) { - LogPrintf("Unable to initialize WakeupPipe, fcntl for O_NONBLOCK on m_pipe[%d] failed\n", idx); + LogPrintf("Unable to initialize WakeupPipe, fcntl for O_NONBLOCK on m_pipe[%d] failed with error %s\n", idx, + NetworkErrorString(WSAGetLastError())); return; } } if (edge_trig_events && !edge_trig_events->RegisterPipe(m_pipe[0])) { - LogPrintf("Unable to initialize WakeupPipe, EdgeTriggeredEvents::RegisterPipe() failed\n"); + LogPrintf("Unable to initialize WakeupPipe, EdgeTriggeredEvents::RegisterPipe() failed for m_pipe[0] = %d\n", + m_pipe[0]); return; } m_valid = true; @@ -38,7 +43,8 @@ WakeupPipe::~WakeupPipe() if (m_valid) { #ifdef USE_WAKEUP_PIPE if (m_edge_trig_events && !m_edge_trig_events->UnregisterPipe(m_pipe[0])) { - LogPrintf("Destroying WakeupPipe instance, EdgeTriggeredEvents::UnregisterPipe() failed\n"); + LogPrintf("Destroying WakeupPipe instance, EdgeTriggeredEvents::UnregisterPipe() failed for m_pipe[0] = %d\n", + m_pipe[0]); } for (size_t idx = 0; idx < m_pipe.size(); idx++) { if (close(m_pipe[idx]) != 0) { @@ -72,9 +78,14 @@ void WakeupPipe::Write() #ifdef USE_WAKEUP_PIPE assert(m_valid && m_pipe[1] != -1); - std::array buf; - if (write(m_pipe[1], buf.data(), buf.size()) != 1) { - LogPrintf("Write to m_pipe[1] failed\n"); + std::array buf; + int ret = write(m_pipe[1], buf.data(), buf.size()); + if (ret == -1) { + LogPrintf("write() to m_pipe[1] = %d failed with error %s\n", m_pipe[1], NetworkErrorString(WSAGetLastError())); + } + if (ret != EXPECTED_PIPE_WRITTEN_BYTES) { + LogPrintf("write() to m_pipe[1] = %d succeeded with unexpected result %d (expected %d)\n", m_pipe[1], ret, + EXPECTED_PIPE_WRITTEN_BYTES); } m_need_wakeup = false;