Skip to content

Commit

Permalink
Merge #5956: refactor: add cs_mapSocketToNode and cs_sendable_receiva…
Browse files Browse the repository at this point in the history
…ble_nodes to minimize cs_vNode contention (and document an undocumented lock requirement)

0b8fe48 refactor: add cs_mapSocketToNode and cs_sendable_receivable_nodes to minimize cs_vNode contention (and document an undocumented lock requirement) (pasta)

Pull request description:

  ## Issue being fixed or feature implemented
  Reduce locking contention on cs_vNode

  ## What was done?
  added new mutex

  ## How Has This Been Tested?
  Building running locally on testnet

  ## Breaking Changes
  None

  ## Checklist:
  - [ ] I have performed a self-review of my own code
  - [ ] I have commented my code, particularly in hard-to-understand areas
  - [ ] I have added or updated relevant unit/integration/functional/e2e tests
  - [ ] I have made corresponding changes to the documentation
  - [x] I have assigned this pull request to a milestone

Top commit has no ACKs.

Tree-SHA512: b7a3aa8078cf7f2b5f9a833a345e405351a927fa517993d77cefa77c9d109c5d92e06b6d3f91093b0fa4d91c4a464f19ce4eb457e486f9532ce890f02b9b99e1
  • Loading branch information
PastaPastaPasta committed Apr 3, 2024
2 parents c0c6a90 + 0b8fe48 commit fbcef10
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 42 deletions.
87 changes: 49 additions & 38 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ void CNode::CloseSocketDisconnect(CConnman* connman)
AssertLockHeld(connman->cs_vNodes);

fDisconnect = true;
LOCK(cs_hSocket);
LOCK2(connman->cs_mapSocketToNode, cs_hSocket);
if (hSocket == INVALID_SOCKET) {
return;
}
Expand All @@ -554,8 +554,11 @@ void CNode::CloseSocketDisconnect(CConnman* connman)
fCanSendData = false;

connman->mapSocketToNode.erase(hSocket);
connman->mapReceivableNodes.erase(GetId());
connman->mapSendableNodes.erase(GetId());
{
LOCK(connman->cs_sendable_receivable_nodes);
connman->mapReceivableNodes.erase(GetId());
connman->mapSendableNodes.erase(GetId());
}
{
LOCK(connman->cs_mapNodesWithDataToSend);
if (connman->mapNodesWithDataToSend.erase(GetId()) != 0) {
Expand Down Expand Up @@ -1265,7 +1268,7 @@ void CConnman::CreateNodeFromAcceptedSocket(SOCKET hSocket,
{
LOCK(cs_vNodes);
vNodes.push_back(pnode);
mapSocketToNode.emplace(hSocket, pnode);
WITH_LOCK(cs_mapSocketToNode, mapSocketToNode.emplace(hSocket, pnode));
RegisterEvents(pnode);
WakeSelect();
}
Expand Down Expand Up @@ -1778,23 +1781,25 @@ void CConnman::SocketEvents(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_s

void CConnman::SocketHandler()
{
bool fOnlyPoll = false;
{
bool fOnlyPoll = [this]() {
// check if we have work to do and thus should avoid waiting for events
LOCK2(cs_vNodes, cs_mapNodesWithDataToSend);
LOCK2(cs_vNodes, cs_sendable_receivable_nodes);
if (!mapReceivableNodes.empty()) {
fOnlyPoll = true;
} else if (!mapSendableNodes.empty() && !mapNodesWithDataToSend.empty()) {
// we must check if at least one of the nodes with pending messages is also sendable, as otherwise a single
// node would be able to make the network thread busy with polling
for (auto& p : mapNodesWithDataToSend) {
if (mapSendableNodes.count(p.first)) {
fOnlyPoll = true;
break;
return true;
} else if (!mapSendableNodes.empty()) {
if (LOCK(cs_mapNodesWithDataToSend); !mapNodesWithDataToSend.empty()) {
// we must check if at least one of the nodes with pending messages is also sendable, as otherwise a single
// node would be able to make the network thread busy with polling
for (auto& p : mapNodesWithDataToSend) {
if (mapSendableNodes.count(p.first)) {
return true;
break;
}
}
}
}
}
return false;
}();

std::set<SOCKET> recv_set, send_set, error_set;
SocketEvents(recv_set, send_set, error_set, fOnlyPoll);
Expand Down Expand Up @@ -1829,7 +1834,7 @@ void CConnman::SocketHandler()
std::vector<CNode*> vReceivableNodes;
std::vector<CNode*> vSendableNodes;
{
LOCK(cs_vNodes);
LOCK(cs_mapSocketToNode);
for (auto hSocket : error_set) {
auto it = mapSocketToNode.find(hSocket);
if (it == mapSocketToNode.end()) {
Expand All @@ -1849,6 +1854,7 @@ void CConnman::SocketHandler()
continue;
}

LOCK(cs_sendable_receivable_nodes);
auto jt = mapReceivableNodes.emplace(it->second->GetId(), it->second);
assert(jt.first->second == it->second);
it->second->fHasRecvData = true;
Expand All @@ -1859,31 +1865,36 @@ void CConnman::SocketHandler()
continue;
}

LOCK(cs_sendable_receivable_nodes);
auto jt = mapSendableNodes.emplace(it->second->GetId(), it->second);
assert(jt.first->second == it->second);
it->second->fCanSendData = true;
}

// collect nodes that have a receivable socket
// also clean up mapReceivableNodes from nodes that were receivable in the last iteration but aren't anymore
vReceivableNodes.reserve(mapReceivableNodes.size());
for (auto it = mapReceivableNodes.begin(); it != mapReceivableNodes.end(); ) {
if (!it->second->fHasRecvData) {
it = mapReceivableNodes.erase(it);
} else {
// Implement the following logic:
// * If there is data to send, try sending data. As this only
// happens when optimistic write failed, we choose to first drain the
// write buffer in this case before receiving more. This avoids
// needlessly queueing received data, if the remote peer is not themselves
// receiving data. This means properly utilizing TCP flow control signalling.
// * Otherwise, if there is space left in the receive buffer (!fPauseRecv), try
// receiving data (which should succeed as the socket signalled as receivable).
if (!it->second->fPauseRecv && it->second->nSendMsgSize == 0 && !it->second->fDisconnect) {
it->second->AddRef();
vReceivableNodes.emplace_back(it->second);
{
LOCK(cs_sendable_receivable_nodes);

vReceivableNodes.reserve(mapReceivableNodes.size());
for (auto it = mapReceivableNodes.begin(); it != mapReceivableNodes.end(); ) {
if (!it->second->fHasRecvData) {
it = mapReceivableNodes.erase(it);
} else {
// Implement the following logic:
// * If there is data to send, try sending data. As this only
// happens when optimistic write failed, we choose to first drain the
// write buffer in this case before receiving more. This avoids
// needlessly queueing received data, if the remote peer is not themselves
// receiving data. This means properly utilizing TCP flow control signalling.
// * Otherwise, if there is space left in the receive buffer (!fPauseRecv), try
// receiving data (which should succeed as the socket signalled as receivable).
if (!it->second->fPauseRecv && it->second->nSendMsgSize == 0 && !it->second->fDisconnect) {
it->second->AddRef();
vReceivableNodes.emplace_back(it->second);
}
++it;
}
++it;
}
}

Expand Down Expand Up @@ -1947,7 +1958,7 @@ void CConnman::SocketHandler()
}

{
LOCK(cs_vNodes);
LOCK(cs_sendable_receivable_nodes);
// remove nodes from mapSendableNodes, so that the next iteration knows that there is no work to do
// (even if there are pending messages to be sent)
for (auto it = mapSendableNodes.begin(); it != mapSendableNodes.end(); ) {
Expand Down Expand Up @@ -2901,7 +2912,7 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
pnode->m_masternode_probe_connection = true;

{
LOCK2(cs_vNodes, pnode->cs_hSocket);
LOCK2(cs_mapSocketToNode, pnode->cs_hSocket);
mapSocketToNode.emplace(pnode->hSocket, pnode);
}

Expand Down Expand Up @@ -3516,9 +3527,9 @@ void CConnman::StopNodes()
for (CNode* pnode : vNodesDisconnected) {
DeleteNode(pnode);
}
mapSocketToNode.clear();
WITH_LOCK(cs_mapSocketToNode, mapSocketToNode.clear());
{
LOCK(cs_vNodes);
LOCK(cs_sendable_receivable_nodes);
mapReceivableNodes.clear();
}
{
Expand Down
9 changes: 5 additions & 4 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -1374,7 +1374,8 @@ friend class CNode;
std::set<uint256> masternodePendingProbes GUARDED_BY(cs_vPendingMasternodes);
std::vector<CNode*> vNodes GUARDED_BY(cs_vNodes);
std::list<CNode*> vNodesDisconnected;
std::unordered_map<SOCKET, CNode*> mapSocketToNode;
mutable Mutex cs_mapSocketToNode;
std::unordered_map<SOCKET, CNode*> mapSocketToNode GUARDED_BY(cs_mapSocketToNode);
mutable RecursiveMutex cs_vNodes;
std::atomic<NodeId> nLastNodeId{0};
unsigned int nPrevNodeCount{0};
Expand Down Expand Up @@ -1484,9 +1485,9 @@ friend class CNode;
int epollfd{-1};
#endif

/** Protected by cs_vNodes */
std::unordered_map<NodeId, CNode*> mapReceivableNodes GUARDED_BY(cs_vNodes);
std::unordered_map<NodeId, CNode*> mapSendableNodes GUARDED_BY(cs_vNodes);
Mutex cs_sendable_receivable_nodes;
std::unordered_map<NodeId, CNode*> mapReceivableNodes GUARDED_BY(cs_sendable_receivable_nodes);
std::unordered_map<NodeId, CNode*> mapSendableNodes GUARDED_BY(cs_sendable_receivable_nodes);
/** Protected by cs_mapNodesWithDataToSend */
std::unordered_map<NodeId, CNode*> mapNodesWithDataToSend GUARDED_BY(cs_mapNodesWithDataToSend);
mutable RecursiveMutex cs_mapNodesWithDataToSend;
Expand Down

0 comments on commit fbcef10

Please sign in to comment.