diff --git a/src/governance/governance.cpp b/src/governance/governance.cpp index 19348ba303b8f..023d0603e99f6 100644 --- a/src/governance/governance.cpp +++ b/src/governance/governance.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -141,9 +142,9 @@ PeerMsgRet CGovernanceManager::ProcessMessage(CNode& peer, CConnman& connman, Pe LogPrint(BCLog::GOBJECT, "MNGOVERNANCESYNC -- syncing governance objects to our peer %s\n", peer.GetLogString()); if (nProp == uint256()) { - return SyncObjects(peer, connman); + return SyncObjects(peer, peerman, connman); } else { - SyncSingleObjVotes(peer, nProp, filter, connman); + SyncSingleObjVotes(peer, peerman, nProp, filter, connman); } } @@ -858,7 +859,7 @@ bool CGovernanceManager::ConfirmInventoryRequest(const CInv& inv) return true; } -void CGovernanceManager::SyncSingleObjVotes(CNode& peer, const uint256& nProp, const CBloomFilter& filter, CConnman& connman) +void CGovernanceManager::SyncSingleObjVotes(CNode& peer, PeerManager& peerman, const uint256& nProp, const CBloomFilter& filter, CConnman& connman) { // do not provide any data until our node is synced if (!Assert(m_mn_sync)->IsSynced()) return; @@ -899,7 +900,7 @@ void CGovernanceManager::SyncSingleObjVotes(CNode& peer, const uint256& nProp, c if (filter.contains(nVoteHash) || !vote.IsValid(tip_mn_list, onlyVotingKeyAllowed)) { continue; } - peer.PushInventory(CInv(MSG_GOVERNANCE_OBJECT_VOTE, nVoteHash)); + peerman.PushInventory(peer.GetId(), CInv(MSG_GOVERNANCE_OBJECT_VOTE, nVoteHash)); ++nVoteCount; } @@ -908,7 +909,7 @@ void CGovernanceManager::SyncSingleObjVotes(CNode& peer, const uint256& nProp, c LogPrint(BCLog::GOBJECT, "CGovernanceManager::%s -- sent %d votes to peer=%d\n", __func__, nVoteCount, peer.GetId()); } -PeerMsgRet CGovernanceManager::SyncObjects(CNode& peer, CConnman& connman) const +PeerMsgRet CGovernanceManager::SyncObjects(CNode& peer, PeerManager& peerman, CConnman& connman) const { assert(m_netfulfilledman.IsValid()); @@ -959,7 +960,7 @@ PeerMsgRet CGovernanceManager::SyncObjects(CNode& peer, CConnman& connman) const // Push the inventory budget proposal message over to the other client LogPrint(BCLog::GOBJECT, "CGovernanceManager::%s -- syncing govobj: %s, peer=%d\n", __func__, strHash, peer.GetId()); - peer.PushInventory(CInv(MSG_GOVERNANCE_OBJECT, nHash)); + peerman.PushInventory(peer.GetId(), CInv(MSG_GOVERNANCE_OBJECT, nHash)); ++nObjCount; } diff --git a/src/governance/governance.h b/src/governance/governance.h index ddb859603e99a..0747e5dafaafd 100644 --- a/src/governance/governance.h +++ b/src/governance/governance.h @@ -292,8 +292,8 @@ class CGovernanceManager : public GovernanceStore */ bool ConfirmInventoryRequest(const CInv& inv); - void SyncSingleObjVotes(CNode& peer, const uint256& nProp, const CBloomFilter& filter, CConnman& connman); - PeerMsgRet SyncObjects(CNode& peer, CConnman& connman) const; + void SyncSingleObjVotes(CNode& peer, PeerManager& peerman, const uint256& nProp, const CBloomFilter& filter, CConnman& connman); + PeerMsgRet SyncObjects(CNode& peer, PeerManager& peerman, CConnman& connman) const; PeerMsgRet ProcessMessage(CNode& peer, CConnman& connman, PeerManager& peerman, std::string_view msg_type, CDataStream& vRecv); diff --git a/src/llmq/context.cpp b/src/llmq/context.cpp index 919c9dce8c61e..220cd2de11562 100644 --- a/src/llmq/context.cpp +++ b/src/llmq/context.cpp @@ -29,13 +29,13 @@ LLMQContext::LLMQContext(CChainState& chainstate, CConnman& connman, CDeterminis llmq::quorumBlockProcessor = std::make_unique(chainstate, dmnman, evo_db, peerman); return llmq::quorumBlockProcessor.get(); }()}, - qdkgsman{std::make_unique(*bls_worker, chainstate, connman, dmnman, *dkg_debugman, mn_metaman, *quorum_block_processor, mn_activeman, sporkman, unit_tests, wipe)}, + qdkgsman{std::make_unique(*bls_worker, chainstate, connman, dmnman, *dkg_debugman, mn_metaman, *quorum_block_processor, mn_activeman, sporkman, peerman, unit_tests, wipe)}, qman{[&]() -> llmq::CQuorumManager* const { assert(llmq::quorumManager == nullptr); llmq::quorumManager = std::make_unique(*bls_worker, chainstate, connman, dmnman, *qdkgsman, evo_db, *quorum_block_processor, mn_activeman, mn_sync, sporkman); return llmq::quorumManager.get(); }()}, - sigman{std::make_unique(connman, mn_activeman, *llmq::quorumManager, unit_tests, wipe)}, + sigman{std::make_unique(connman, mn_activeman, *llmq::quorumManager, peerman, unit_tests, wipe)}, shareman{std::make_unique(connman, *sigman, mn_activeman, *llmq::quorumManager, sporkman, peerman)}, clhandler{[&]() -> llmq::CChainLocksHandler* const { assert(llmq::chainLocksHandler == nullptr); diff --git a/src/llmq/dkgsession.cpp b/src/llmq/dkgsession.cpp index ead2adab8fdeb..bcbf91fe7e0a6 100644 --- a/src/llmq/dkgsession.cpp +++ b/src/llmq/dkgsession.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -1326,10 +1327,10 @@ void CDKGSession::RelayInvToParticipants(const CInv& inv) const myProTxHash.ToString().substr(0, 4), ss.str()); std::stringstream ss2; - connman.ForEachNode([&](CNode* pnode) { + connman.ForEachNode([&](const CNode* pnode) { if (pnode->qwatch || (!pnode->GetVerifiedProRegTxHash().IsNull() && (relayMembers.count(pnode->GetVerifiedProRegTxHash()) != 0))) { - pnode->PushInventory(inv); + Assert(m_peerman)->PushInventory(pnode->GetId(), inv); } if (pnode->GetVerifiedProRegTxHash().IsNull()) { diff --git a/src/llmq/dkgsession.h b/src/llmq/dkgsession.h index 5b19d6a0973da..a5b58d150012e 100644 --- a/src/llmq/dkgsession.h +++ b/src/llmq/dkgsession.h @@ -22,6 +22,7 @@ class CDeterministicMN; class CMasternodeMetaMan; class CSporkManager; class UniValue; +class PeerManager; using CDeterministicMNCPtr = std::shared_ptr; @@ -280,6 +281,7 @@ class CDKGSession CMasternodeMetaMan& m_mn_metaman; const CActiveMasternodeManager* const m_mn_activeman; const CSporkManager& m_sporkman; + const std::unique_ptr& m_peerman; const CBlockIndex* m_quorum_base_block_index{nullptr}; int quorumIndex{0}; @@ -322,9 +324,10 @@ class CDKGSession CDKGSession(const Consensus::LLMQParams& _params, CBLSWorker& _blsWorker, CConnman& _connman, CDeterministicMNManager& dmnman, CDKGSessionManager& _dkgManager, CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman, const CActiveMasternodeManager* const mn_activeman, - const CSporkManager& sporkman) : + const CSporkManager& sporkman, const std::unique_ptr& peerman) : params(_params), blsWorker(_blsWorker), cache(_blsWorker), connman(_connman), m_dmnman(dmnman), dkgManager(_dkgManager), - dkgDebugManager(_dkgDebugManager), m_mn_metaman(mn_metaman), m_mn_activeman(mn_activeman), m_sporkman(sporkman) {} + dkgDebugManager(_dkgDebugManager), m_mn_metaman(mn_metaman), m_mn_activeman(mn_activeman), m_sporkman(sporkman), + m_peerman(peerman) {} bool Init(gsl::not_null pQuorumBaseBlockIndex, Span mns, const uint256& _myProTxHash, int _quorumIndex); diff --git a/src/llmq/dkgsessionhandler.cpp b/src/llmq/dkgsessionhandler.cpp index 7b48885bded87..4a95aefde2246 100644 --- a/src/llmq/dkgsessionhandler.cpp +++ b/src/llmq/dkgsessionhandler.cpp @@ -27,7 +27,7 @@ namespace llmq CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman, CDKGDebugManager& _dkgDebugManager, CDKGSessionManager& _dkgManager, CMasternodeMetaMan& mn_metaman, CQuorumBlockProcessor& _quorumBlockProcessor, const CActiveMasternodeManager* const mn_activeman, - const CSporkManager& sporkman, const Consensus::LLMQParams& _params, int _quorumIndex) : + const CSporkManager& sporkman, const std::unique_ptr& peerman, const Consensus::LLMQParams& _params, int _quorumIndex) : blsWorker(_blsWorker), m_chainstate(chainstate), connman(_connman), @@ -38,9 +38,10 @@ CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chai quorumBlockProcessor(_quorumBlockProcessor), m_mn_activeman(mn_activeman), m_sporkman(sporkman), + m_peerman(peerman), params(_params), quorumIndex(_quorumIndex), - curSession(std::make_unique(_params, _blsWorker, _connman, dmnman, _dkgManager, _dkgDebugManager, m_mn_metaman, m_mn_activeman, sporkman)), + curSession(std::make_unique(_params, _blsWorker, _connman, dmnman, _dkgManager, _dkgDebugManager, m_mn_metaman, m_mn_activeman, sporkman, peerman)), pendingContributions((size_t)_params.size * 2, MSG_QUORUM_CONTRIB), // we allow size*2 messages as we need to make sure we see bad behavior (double messages) pendingComplaints((size_t)_params.size * 2, MSG_QUORUM_COMPLAINT), pendingJustifications((size_t)_params.size * 2, MSG_QUORUM_JUSTIFICATION), @@ -188,7 +189,7 @@ void CDKGSessionHandler::StopThread() bool CDKGSessionHandler::InitNewQuorum(const CBlockIndex* pQuorumBaseBlockIndex) { - curSession = std::make_unique(params, blsWorker, connman, m_dmnman, dkgManager, dkgDebugManager, m_mn_metaman, m_mn_activeman, m_sporkman); + curSession = std::make_unique(params, blsWorker, connman, m_dmnman, dkgManager, dkgDebugManager, m_mn_metaman, m_mn_activeman, m_sporkman, m_peerman); if (!DeploymentDIP0003Enforced(pQuorumBaseBlockIndex->nHeight, Params().GetConsensus())) { return false; diff --git a/src/llmq/dkgsessionhandler.h b/src/llmq/dkgsessionhandler.h index fb509be8ec8b8..e7153bc1fcad6 100644 --- a/src/llmq/dkgsessionhandler.h +++ b/src/llmq/dkgsessionhandler.h @@ -130,6 +130,7 @@ class CDKGSessionHandler CQuorumBlockProcessor& quorumBlockProcessor; const CActiveMasternodeManager* const m_mn_activeman; const CSporkManager& m_sporkman; + const std::unique_ptr& m_peerman; const Consensus::LLMQParams params; const int quorumIndex; @@ -151,7 +152,7 @@ class CDKGSessionHandler CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman, CDKGDebugManager& _dkgDebugManager, CDKGSessionManager& _dkgManager, CMasternodeMetaMan& mn_metaman, CQuorumBlockProcessor& _quorumBlockProcessor, const CActiveMasternodeManager* const mn_activeman, - const CSporkManager& sporkman, const Consensus::LLMQParams& _params, int _quorumIndex); + const CSporkManager& sporkman, const std::unique_ptr& peerman, const Consensus::LLMQParams& _params, int _quorumIndex); ~CDKGSessionHandler() = default; void UpdatedBlockTip(const CBlockIndex *pindexNew); diff --git a/src/llmq/dkgsessionmgr.cpp b/src/llmq/dkgsessionmgr.cpp index 9a618440736f1..ede710a0e9031 100644 --- a/src/llmq/dkgsessionmgr.cpp +++ b/src/llmq/dkgsessionmgr.cpp @@ -26,7 +26,8 @@ static const std::string DB_ENC_CONTRIB = "qdkg_E"; CDKGSessionManager::CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman, CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman, CQuorumBlockProcessor& _quorumBlockProcessor, - const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman, bool unitTests, bool fWipe) : + const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman, + const std::unique_ptr& peerman, bool unitTests, bool fWipe) : db(std::make_unique(unitTests ? "" : (GetDataDir() / "llmq/dkgdb"), 1 << 20, unitTests, fWipe)), blsWorker(_blsWorker), m_chainstate(chainstate), @@ -49,7 +50,8 @@ CDKGSessionManager::CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chai for (const auto i : irange::range(session_count)) { dkgSessionHandlers.emplace(std::piecewise_construct, std::forward_as_tuple(params.type, i), - std::forward_as_tuple(blsWorker, m_chainstate, connman, dmnman, dkgDebugManager, *this, mn_metaman, quorumBlockProcessor, mn_activeman, spork_manager, params, i)); + std::forward_as_tuple(blsWorker, m_chainstate, connman, dmnman, dkgDebugManager, *this, mn_metaman, + quorumBlockProcessor, mn_activeman, spork_manager, peerman, params, i)); } } } diff --git a/src/llmq/dkgsessionmgr.h b/src/llmq/dkgsessionmgr.h index 6077b8373cfa6..92094063a8344 100644 --- a/src/llmq/dkgsessionmgr.h +++ b/src/llmq/dkgsessionmgr.h @@ -69,7 +69,8 @@ class CDKGSessionManager public: CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman, CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman, CQuorumBlockProcessor& _quorumBlockProcessor, - const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman, bool unitTests, bool fWipe); + const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman, + const std::unique_ptr& peerman, bool unitTests, bool fWipe); ~CDKGSessionManager() = default; void StartThreads(); diff --git a/src/llmq/instantsend.cpp b/src/llmq/instantsend.cpp index 76d42113cb19b..3487d33f86025 100644 --- a/src/llmq/instantsend.cpp +++ b/src/llmq/instantsend.cpp @@ -1452,12 +1452,9 @@ void CInstantSendManager::AskNodesForLockedTx(const uint256& txid, const CConnma if (nodesToAskFor.size() >= 4) { return; } - if (peerman.CanRelayAddrs(pnode->GetId())) { - LOCK(pnode->m_tx_relay->cs_tx_inventory); - if (pnode->m_tx_relay->filterInventoryKnown.contains(txid)) { - pnode->AddRef(); - nodesToAskFor.emplace_back(pnode); - } + if (peerman.IsInvInFilter(pnode->GetId(), txid)) { + pnode->AddRef(); + nodesToAskFor.emplace_back(pnode); } }; diff --git a/src/llmq/signing.cpp b/src/llmq/signing.cpp index 4e4590c4de52e..f7e3a08881050 100644 --- a/src/llmq/signing.cpp +++ b/src/llmq/signing.cpp @@ -540,8 +540,8 @@ void CRecoveredSigsDb::CleanupOldVotes(int64_t maxAge) ////////////////// CSigningManager::CSigningManager(CConnman& _connman, const CActiveMasternodeManager* const mn_activeman, const CQuorumManager& _qman, - bool fMemory, bool fWipe) : - db(fMemory, fWipe), connman(_connman), m_mn_activeman(mn_activeman), qman(_qman) + const std::unique_ptr& peerman, bool fMemory, bool fWipe) : + db(fMemory, fWipe), connman(_connman), m_mn_activeman(mn_activeman), qman(_qman), m_peerman(peerman) { } @@ -572,18 +572,18 @@ bool CSigningManager::GetRecoveredSigForGetData(const uint256& hash, CRecoveredS return true; } -PeerMsgRet CSigningManager::ProcessMessage(const CNode& pfrom, gsl::not_null peerman, const std::string& msg_type, CDataStream& vRecv) +PeerMsgRet CSigningManager::ProcessMessage(const CNode& pfrom, const std::string& msg_type, CDataStream& vRecv) { if (msg_type == NetMsgType::QSIGREC) { auto recoveredSig = std::make_shared(); vRecv >> *recoveredSig; - return ProcessMessageRecoveredSig(pfrom, peerman, recoveredSig); + return ProcessMessageRecoveredSig(pfrom, recoveredSig); } return {}; } -PeerMsgRet CSigningManager::ProcessMessageRecoveredSig(const CNode& pfrom, gsl::not_null peerman, const std::shared_ptr& recoveredSig) +PeerMsgRet CSigningManager::ProcessMessageRecoveredSig(const CNode& pfrom, const std::shared_ptr& recoveredSig) { { LOCK(cs_main); @@ -615,12 +615,6 @@ PeerMsgRet CSigningManager::ProcessMessageRecoveredSig(const CNode& pfrom, gsl:: return {}; } - if (m_peerman == nullptr) { - m_peerman = peerman; - } - // we should never use one CSigningManager with different PeerManager - assert(m_peerman == peerman); - pendingRecoveredSigs[pfrom.GetId()].emplace_back(recoveredSig); return {}; } @@ -776,7 +770,7 @@ bool CSigningManager::ProcessPendingRecoveredSigs() if (batchVerifier.badSources.count(nodeId)) { LogPrint(BCLog::LLMQ, "CSigningManager::%s -- invalid recSig from other node, banning peer=%d\n", __func__, nodeId); - m_peerman.load()->Misbehaving(nodeId, 100); + Assert(m_peerman)->Misbehaving(nodeId, 100); continue; } @@ -840,9 +834,9 @@ void CSigningManager::ProcessRecoveredSig(const std::shared_ptrGetHash()); - connman.ForEachNode([&](CNode* pnode) { + connman.ForEachNode([&](const CNode* pnode) { if (pnode->fSendRecSigs) { - pnode->PushInventory(inv); + Assert(m_peerman)->PushInventory(pnode->GetId(), inv); } }); } diff --git a/src/llmq/signing.h b/src/llmq/signing.h index 92102dbe855b3..847b9685851b7 100644 --- a/src/llmq/signing.h +++ b/src/llmq/signing.h @@ -165,8 +165,7 @@ class CSigningManager CConnman& connman; const CActiveMasternodeManager* const m_mn_activeman; const CQuorumManager& qman; - - std::atomic m_peerman{nullptr}; + const std::unique_ptr& m_peerman; // Incoming and not verified yet std::unordered_map>> pendingRecoveredSigs GUARDED_BY(cs); @@ -179,12 +178,13 @@ class CSigningManager std::vector recoveredSigsListeners GUARDED_BY(cs); public: - CSigningManager(CConnman& _connman, const CActiveMasternodeManager* const mn_activeman, const CQuorumManager& _qman, bool fMemory, bool fWipe); + CSigningManager(CConnman& _connman, const CActiveMasternodeManager* const mn_activeman, const CQuorumManager& _qman, + const std::unique_ptr& peerman, bool fMemory, bool fWipe); bool AlreadyHave(const CInv& inv) const; bool GetRecoveredSigForGetData(const uint256& hash, CRecoveredSig& ret) const; - PeerMsgRet ProcessMessage(const CNode& pnode, gsl::not_null peerman, const std::string& msg_type, CDataStream& vRecv); + PeerMsgRet ProcessMessage(const CNode& pnode, const std::string& msg_type, CDataStream& vRecv); // This is called when a recovered signature was was reconstructed from another P2P message and is known to be valid // This is the case for example when a signature appears as part of InstantSend or ChainLocks @@ -197,7 +197,7 @@ class CSigningManager void TruncateRecoveredSig(Consensus::LLMQType llmqType, const uint256& id); private: - PeerMsgRet ProcessMessageRecoveredSig(const CNode& pfrom, gsl::not_null peerman, const std::shared_ptr& recoveredSig); + PeerMsgRet ProcessMessageRecoveredSig(const CNode& pfrom, const std::shared_ptr& recoveredSig); static bool PreVerifyRecoveredSig(const CQuorumManager& quorum_manager, const CRecoveredSig& recoveredSig, bool& retBan); void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions, diff --git a/src/net.cpp b/src/net.cpp index cfc7f85f0d0df..e4680255e282f 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -654,12 +654,6 @@ void CNode::copyStats(CNodeStats &stats, const std::vector &m_asmap) X(addrBind); stats.m_network = ConnectedThroughNetwork(); stats.m_mapped_as = addr.GetMappedAS(m_asmap); - if (!IsBlockOnlyConn()) { - LOCK(m_tx_relay->cs_filter); - stats.fRelayTxes = m_tx_relay->fRelayTxes; - } else { - stats.fRelayTxes = false; - } X(m_last_send); X(m_last_recv); X(nLastTXTime); @@ -957,7 +951,7 @@ static bool CompareNodeTXTime(const NodeEvictionCandidate &a, const NodeEviction { // There is a fall-through here because it is common for a node to have more than a few peers that have not yet relayed txn. if (a.nLastTXTime != b.nLastTXTime) return a.nLastTXTime < b.nLastTXTime; - if (a.fRelayTxes != b.fRelayTxes) return b.fRelayTxes; + if (a.m_relay_txs != b.m_relay_txs) return b.m_relay_txs; if (a.fBloomFilter != b.fBloomFilter) return a.fBloomFilter; return a.nTimeConnected > b.nTimeConnected; } @@ -965,7 +959,7 @@ static bool CompareNodeTXTime(const NodeEvictionCandidate &a, const NodeEviction // Pick out the potential block-relay only peers, and sort them by last block time. static bool CompareNodeBlockRelayOnlyTime(const NodeEvictionCandidate &a, const NodeEvictionCandidate &b) { - if (a.fRelayTxes != b.fRelayTxes) return a.fRelayTxes; + if (a.m_relay_txs != b.m_relay_txs) return a.m_relay_txs; if (a.nLastBlockTime != b.nLastBlockTime) return a.nLastBlockTime < b.nLastBlockTime; if (a.fRelevantServices != b.fRelevantServices) return b.fRelevantServices; return a.nTimeConnected > b.nTimeConnected; @@ -1034,7 +1028,7 @@ void ProtectEvictionCandidatesByRatio(std::vector& vEvict // Protect up to 8 non-tx-relay peers that have sent us novel blocks. const size_t erase_size = std::min(size_t(8), vEvictionCandidates.size()); EraseLastKElements(vEvictionCandidates, CompareNodeBlockRelayOnlyTime, erase_size, - [](const NodeEvictionCandidate& n) { return !n.fRelayTxes && n.fRelevantServices; }); + [](const NodeEvictionCandidate& n) { return !n.m_relay_txs && n.fRelevantServices; }); // Protect 4 nodes that most recently sent us novel blocks. // An attacker cannot manipulate this metric without performing useful work. @@ -1120,18 +1114,11 @@ bool CConnman::AttemptToEvictConnection() } } - bool peer_relay_txes = false; - bool peer_filter_not_null = false; - if (!node->IsBlockOnlyConn()) { - LOCK(node->m_tx_relay->cs_filter); - peer_relay_txes = node->m_tx_relay->fRelayTxes; - peer_filter_not_null = node->m_tx_relay->pfilter != nullptr; - } NodeEvictionCandidate candidate = {node->GetId(), node->nTimeConnected, node->m_min_ping_time, node->nLastBlockTime, node->nLastTXTime, HasAllDesirableServiceFlags(node->nServices), - peer_relay_txes, peer_filter_not_null, node->nKeyedNetGroup, - node->m_prefer_evict, node->addr.IsLocal(), + node->m_relays_txs.load(), node->m_bloom_filter_loaded.load(), + node->nKeyedNetGroup, node->m_prefer_evict, node->addr.IsLocal(), node->m_inbound_onion}; vEvictionCandidates.push_back(candidate); } diff --git a/src/net.h b/src/net.h index ad61dcbecbfd1..59057603253cb 100644 --- a/src/net.h +++ b/src/net.h @@ -275,7 +275,6 @@ class CNodeStats public: NodeId nodeid; ServiceFlags nServices; - bool fRelayTxes; std::chrono::seconds m_last_send; std::chrono::seconds m_last_recv; int64_t nLastTXTime; @@ -581,34 +580,16 @@ class CNode assert(false); } - struct TxRelay { - mutable RecursiveMutex cs_filter; - // We use fRelayTxes for two purposes - - // a) it allows us to not relay tx invs before receiving the peer's version message - // b) the peer may tell us in its version message that we should not relay tx invs - // unless it loads a bloom filter. - bool fRelayTxes GUARDED_BY(cs_filter){false}; - std::unique_ptr pfilter PT_GUARDED_BY(cs_filter) GUARDED_BY(cs_filter){nullptr}; - - mutable RecursiveMutex cs_tx_inventory; - // inventory based relay - CRollingBloomFilter filterInventoryKnown GUARDED_BY(cs_tx_inventory){50000, 0.000001}; - // Set of transaction ids we still have to announce. - // They are sorted by the mempool before relay, so the order is not important. - std::set setInventoryTxToSend GUARDED_BY(cs_tx_inventory); - // List of non-tx/non-block inventory items - std::vector vInventoryOtherToSend GUARDED_BY(cs_tx_inventory); - // Used for BIP35 mempool sending, also protected by cs_tx_inventory - bool fSendMempool GUARDED_BY(cs_tx_inventory){false}; - // Last time a "MEMPOOL" request was serviced. - std::atomic m_last_mempool_req{0s}; - std::chrono::microseconds nNextInvSend{0}; - }; +public: + /** Whether we should relay transactions to this peer (their version + * message did not include fRelay=false and this is not a block-relay-only + * connection). This only changes from false to true. It will never change + * back to false. Used only in inbound eviction logic. */ + std::atomic_bool m_relays_txs{false}; - // in bitcoin: m_tx_relay == nullptr if we're not relaying transactions with this peer - // in dash: m_tx_relay should never be nullptr, we don't relay transactions if - // `IsBlockOnlyConn() == true` is instead - std::unique_ptr m_tx_relay{std::make_unique()}; + /** Whether this peer has loaded a bloom filter. Used only in inbound + * eviction logic. */ + std::atomic_bool m_bloom_filter_loaded{false}; /** UNIX epoch time of the last block received from this peer that we had * not yet seen (e.g. not already received from another peer), that passed @@ -695,33 +676,6 @@ class CNode nRefCount--; } - void AddKnownInventory(const uint256& hash) - { - LOCK(m_tx_relay->cs_tx_inventory); - m_tx_relay->filterInventoryKnown.insert(hash); - } - - void PushInventory(const CInv& inv) - { - ASSERT_IF_DEBUG(inv.type != MSG_BLOCK); - if (inv.type == MSG_BLOCK) { - LogPrintf("%s -- WARNING: using PushInventory for BLOCK inv, peer=%d\n", __func__, id); - return; - } - - LOCK(m_tx_relay->cs_tx_inventory); - if (m_tx_relay->filterInventoryKnown.contains(inv.hash)) { - LogPrint(BCLog::NET, "%s -- skipping known inv: %s peer=%d\n", __func__, inv.ToString(), id); - return; - } - LogPrint(BCLog::NET, "%s -- adding new inv: %s peer=%d\n", __func__, inv.ToString(), id); - if (inv.type == MSG_TX || inv.type == MSG_DSTX) { - m_tx_relay->setInventoryTxToSend.insert(inv.hash); - return; - } - m_tx_relay->vInventoryOtherToSend.push_back(inv); - } - void CloseSocketDisconnect(CConnman* connman); void copyStats(CNodeStats &stats, const std::vector &m_asmap); @@ -1549,7 +1503,7 @@ struct NodeEvictionCandidate int64_t nLastBlockTime; int64_t nLastTXTime; bool fRelevantServices; - bool fRelayTxes; + bool m_relay_txs; bool fBloomFilter; uint64_t nKeyedNetGroup; bool prefer_evict; diff --git a/src/net_processing.cpp b/src/net_processing.cpp index c93a6d0a4bb3e..ad5f7ce294093 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -262,6 +262,35 @@ struct Peer { /** Whether a ping has been requested by the user */ std::atomic m_ping_queued{false}; + struct TxRelay { + mutable RecursiveMutex m_bloom_filter_mutex; + // We use m_relay_txs for two purposes - + // a) it allows us to not relay tx invs before receiving the peer's version message + // b) the peer may tell us in its version message that we should not relay tx invs + // unless it loads a bloom filter. + bool m_relay_txs GUARDED_BY(m_bloom_filter_mutex){false}; + std::unique_ptr m_bloom_filter PT_GUARDED_BY(m_bloom_filter_mutex) GUARDED_BY(m_bloom_filter_mutex){nullptr}; + + mutable RecursiveMutex m_tx_inventory_mutex; + // inventory based relay + CRollingBloomFilter m_tx_inventory_known_filter GUARDED_BY(m_tx_inventory_mutex){50000, 0.000001}; + // Set of transaction ids we still have to announce. + // They are sorted by the mempool before relay, so the order is not important. + std::set m_tx_inventory_to_send GUARDED_BY(m_tx_inventory_mutex); + // List of non-tx/non-block inventory items + std::vector vInventoryOtherToSend GUARDED_BY(m_tx_inventory_mutex); + // Used for BIP35 mempool sending, also protected by m_tx_inventory_mutex + bool m_send_mempool GUARDED_BY(m_tx_inventory_mutex){false}; + // Last time a "MEMPOOL" request was serviced. + std::atomic m_last_mempool_req{0s}; + std::chrono::microseconds m_next_inv_send_time{0}; + }; + + // in bitcoin: m_tx_relay == nullptr if we're not relaying transactions with this peer + // in dash: m_tx_relay should never be nullptr, we don't relay transactions if + // `IsBlockOnlyConn() == true` is instead + std::unique_ptr m_tx_relay{std::make_unique()}; + /** A vector of addresses to send to the peer, limited to MAX_ADDR_TO_SEND. */ std::vector m_addrs_to_send; /** Probabilistic filter to track recent addr messages relayed with this @@ -289,6 +318,8 @@ struct Peer { * This field must correlate with whether m_addr_known has been * initialized.*/ std::atomic_bool m_addr_relay_enabled{false}; + /** Whether a Peer can only be relayed blocks */ + const bool m_block_relay_only{false}; /** Whether a getaddr request to this peer is outstanding. */ bool m_getaddr_sent{false}; /** Guards address sending timers. */ @@ -320,8 +351,10 @@ struct Peer { /** Work queue of items requested by this peer **/ std::deque m_getdata_requests GUARDED_BY(m_getdata_requests_mutex); - explicit Peer(NodeId id) + explicit Peer(NodeId id, bool block_relay_only) : m_id(id) + , m_tx_relay(std::make_unique()) + , m_block_relay_only{block_relay_only} {} }; @@ -358,6 +391,7 @@ class PeerManagerImpl final : public PeerManager bool GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) const override; bool IgnoresIncomingTxs() override { return m_ignore_incoming_txs; } void SendPings() override; + void PushInventory(NodeId nodeid, const CInv& inv) override; void RelayInv(CInv &inv, const int minProtoVersion) override; void RelayInvFiltered(CInv &inv, const CTransaction &relatedTx, const int minProtoVersion) override; void RelayInvFiltered(CInv &inv, const uint256 &relatedTxHash, const int minProtoVersion) override; @@ -367,7 +401,7 @@ class PeerManagerImpl final : public PeerManager void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv, const std::chrono::microseconds time_received, const std::atomic& interruptMsgProc) override; bool IsBanned(NodeId pnode) override EXCLUSIVE_LOCKS_REQUIRED(cs_main); - bool CanRelayAddrs(NodeId pnode) const override; + bool IsInvInFilter(NodeId nodeid, const uint256& hash) const override; private: /** Helper to process result of external handlers of message */ @@ -430,7 +464,7 @@ class PeerManagerImpl final : public PeerManager void SendBlockTransactions(CNode& pfrom, const CBlock& block, const BlockTransactionsRequest& req); /** Send a version message to a peer */ - void PushNodeVersion(CNode& pnode); + void PushNodeVersion(CNode& pnode, Peer& peer); /** Send a ping message every PING_INTERVAL or if requested via RPC. May * mark the peer to be disconnected if a ping has timed out. @@ -902,6 +936,39 @@ static void PushAddress(Peer& peer, const CAddress& addr, FastRandomContext& ins } } +static void AddKnownInv(Peer& peer, const uint256& hash) +{ + // Dash always initializes m_tx_relay + assert(peer.m_tx_relay != nullptr); + + LOCK(peer.m_tx_relay->m_tx_inventory_mutex); + peer.m_tx_relay->m_tx_inventory_known_filter.insert(hash); +} + +static void PushInv(Peer& peer, const CInv& inv) +{ + // Dash always initializes m_tx_relay + assert(peer.m_tx_relay != nullptr); + + ASSERT_IF_DEBUG(inv.type != MSG_BLOCK); + if (inv.type == MSG_BLOCK) { + LogPrintf("%s -- WARNING: using PushInv for BLOCK inv, peer=%d\n", __func__, peer.m_id); + return; + } + + LOCK(peer.m_tx_relay->m_tx_inventory_mutex); + if (peer.m_tx_relay->m_tx_inventory_known_filter.contains(inv.hash)) { + LogPrint(BCLog::NET, "%s -- skipping known inv: %s peer=%d\n", __func__, inv.ToString(), peer.m_id); + return; + } + LogPrint(BCLog::NET, "%s -- adding new inv: %s peer=%d\n", __func__, inv.ToString(), peer.m_id); + if (inv.type == MSG_TX || inv.type == MSG_DSTX) { + peer.m_tx_relay->m_tx_inventory_to_send.insert(inv.hash); + return; + } + peer.m_tx_relay->vInventoryOtherToSend.push_back(inv); +} + static void UpdatePreferredDownload(const CNode& node, CNodeState* state) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { nPreferredDownload -= state->fPreferredDownload; @@ -1148,7 +1215,7 @@ void PeerManagerImpl::FindNextBlocksToDownload(NodeId nodeid, unsigned int count } } // namespace -void PeerManagerImpl::PushNodeVersion(CNode& pnode) +void PeerManagerImpl::PushNodeVersion(CNode& pnode, Peer& peer) { const auto& params = Params(); @@ -1343,13 +1410,13 @@ void PeerManagerImpl::InitializeNode(CNode *pnode) { LOCK(cs_main); mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(pnode->IsInboundConn())); } + PeerRef peer = std::make_shared(nodeid, /* block_relay_only = */ pnode->IsBlockOnlyConn()); { - PeerRef peer = std::make_shared(nodeid); LOCK(m_peer_mutex); - m_peer_map.emplace_hint(m_peer_map.end(), nodeid, std::move(peer)); + m_peer_map.emplace_hint(m_peer_map.end(), nodeid, peer); } if (!pnode->IsInboundConn()) { - PushNodeVersion(*pnode); + PushNodeVersion(*pnode, *peer); } } @@ -1474,6 +1541,12 @@ bool PeerManagerImpl::GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) c ping_wait = GetTime() - peer->m_ping_start.load(); } + if (!peer->m_block_relay_only) { + stats.m_relay_txs = WITH_LOCK(peer->m_tx_relay->m_bloom_filter_mutex, return peer->m_tx_relay->m_relay_txs); + } else { + stats.m_relay_txs = false; + } + stats.m_ping_wait = ping_wait; stats.m_addr_processed = peer->m_addr_processed.load(); stats.m_addr_rate_limited = peer->m_addr_rate_limited.load(); @@ -1662,14 +1735,6 @@ bool PeerManagerImpl::IsBanned(NodeId pnode) return false; } -bool PeerManagerImpl::CanRelayAddrs(NodeId pnode) const -{ - PeerRef peer = GetPeerRef(pnode); - if (peer == nullptr) - return false; - return peer->m_addr_relay_enabled; -} - bool PeerManagerImpl::MaybePunishNodeForBlock(NodeId nodeid, const BlockValidationState& state, bool via_compact_block, const std::string& message) { @@ -2113,59 +2178,95 @@ void PeerManagerImpl::SendPings() for(auto& it : m_peer_map) it.second->m_ping_queued = true; } -void PeerManagerImpl::RelayInv(CInv &inv, const int minProtoVersion) { +bool PeerManagerImpl::IsInvInFilter(NodeId nodeid, const uint256& hash) const +{ + PeerRef peer = GetPeerRef(nodeid); + if (peer == nullptr) + return false; + if (peer->m_block_relay_only) + return false; + LOCK(peer->m_tx_relay->m_tx_inventory_mutex); + return peer->m_tx_relay->m_tx_inventory_known_filter.contains(hash); +} + +void PeerManagerImpl::PushInventory(NodeId nodeid, const CInv& inv) +{ + // TODO: Get rid of this function at some point + PeerRef peer = GetPeerRef(nodeid); + if (peer == nullptr) + return; + PushInv(*peer, inv); +} + +void PeerManagerImpl::RelayInv(CInv &inv, const int minProtoVersion) +{ + // TODO: Migrate to iteration through m_peer_map m_connman.ForEachNode([&](CNode* pnode) { if (pnode->nVersion < minProtoVersion || !pnode->CanRelay()) return; - pnode->PushInventory(inv); + + PeerRef peer = GetPeerRef(pnode->GetId()); + if (peer == nullptr) return; + PushInv(*peer, inv); }); } void PeerManagerImpl::RelayInvFiltered(CInv &inv, const CTransaction& relatedTx, const int minProtoVersion) { + // TODO: Migrate to iteration through m_peer_map m_connman.ForEachNode([&](CNode* pnode) { if (pnode->nVersion < minProtoVersion || !pnode->CanRelay() || pnode->IsBlockOnlyConn()) { return; } + + PeerRef peer = GetPeerRef(pnode->GetId()); + if (peer == nullptr) return; { - LOCK(pnode->m_tx_relay->cs_filter); - if (!pnode->m_tx_relay->fRelayTxes) { + LOCK(peer->m_tx_relay->m_bloom_filter_mutex); + if (!peer->m_tx_relay->m_relay_txs) { return; } - if (pnode->m_tx_relay->pfilter && !pnode->m_tx_relay->pfilter->IsRelevantAndUpdate(relatedTx)) { + if (peer->m_tx_relay->m_bloom_filter && !peer->m_tx_relay->m_bloom_filter->IsRelevantAndUpdate(relatedTx)) { return; } } - pnode->PushInventory(inv); + PushInv(*peer, inv); }); } void PeerManagerImpl::RelayInvFiltered(CInv &inv, const uint256& relatedTxHash, const int minProtoVersion) { + // TODO: Migrate to iteration through m_peer_map m_connman.ForEachNode([&](CNode* pnode) { if (pnode->nVersion < minProtoVersion || !pnode->CanRelay() || pnode->IsBlockOnlyConn()) { return; } + + PeerRef peer = GetPeerRef(pnode->GetId()); + if (peer == nullptr) return; { - LOCK(pnode->m_tx_relay->cs_filter); - if (!pnode->m_tx_relay->fRelayTxes) { + LOCK(peer->m_tx_relay->m_bloom_filter_mutex); + if (!peer->m_tx_relay->m_relay_txs) { return; } - if (pnode->m_tx_relay->pfilter && !pnode->m_tx_relay->pfilter->contains(relatedTxHash)) { + if (peer->m_tx_relay->m_bloom_filter && !peer->m_tx_relay->m_bloom_filter->contains(relatedTxHash)) { return; } } - pnode->PushInventory(inv); + PushInv(*peer, inv); }); } void PeerManagerImpl::RelayTransaction(const uint256& txid) { - CInv inv(m_cj_ctx->dstxman->GetDSTX(txid) ? MSG_DSTX : MSG_TX, txid); - m_connman.ForEachNode([&inv](CNode* pnode) - { - pnode->PushInventory(inv); - }); + LOCK(m_peer_mutex); + for(auto& it : m_peer_map) { + Peer& peer = *it.second; + if (!peer.m_tx_relay) continue; + + const CInv inv{m_cj_ctx->dstxman->GetDSTX(txid) ? MSG_DSTX : MSG_TX, txid}; + PushInv(peer, inv); + }; } void PeerManagerImpl::RelayAddress(NodeId originator, @@ -2299,10 +2400,10 @@ void PeerManagerImpl::ProcessGetBlockData(CNode& pfrom, Peer& peer, const CInv& bool sendMerkleBlock = false; CMerkleBlock merkleBlock; if (!pfrom.IsBlockOnlyConn()) { - LOCK(pfrom.m_tx_relay->cs_filter); - if (pfrom.m_tx_relay->pfilter) { + LOCK(peer.m_tx_relay->m_bloom_filter_mutex); + if (peer.m_tx_relay->m_bloom_filter) { sendMerkleBlock = true; - merkleBlock = CMerkleBlock(*pblock, *pfrom.m_tx_relay->pfilter); + merkleBlock = CMerkleBlock(*pblock, *peer.m_tx_relay->m_bloom_filter); } } if (sendMerkleBlock) { @@ -2401,7 +2502,7 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic const std::chrono::seconds now = GetTime(); // Get last mempool request time - const std::chrono::seconds mempool_req = !pfrom.IsBlockOnlyConn() ? pfrom.m_tx_relay->m_last_mempool_req.load() + const std::chrono::seconds mempool_req = !pfrom.IsBlockOnlyConn() ? peer.m_tx_relay->m_last_mempool_req.load() : std::chrono::seconds::min(); // Process as many TX items from the front of the getdata queue as @@ -2463,7 +2564,7 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic for (const uint256& parent_txid : parent_ids_to_add) { // Relaying a transaction with a recent but unconfirmed parent. - if (WITH_LOCK(pfrom.m_tx_relay->cs_tx_inventory, return !pfrom.m_tx_relay->filterInventoryKnown.contains(parent_txid))) { + if (WITH_LOCK(peer.m_tx_relay->m_tx_inventory_mutex, return !peer.m_tx_relay->m_tx_inventory_known_filter.contains(parent_txid))) { LOCK(cs_main); State(pfrom.GetId())->m_recently_announced_invs.insert(parent_txid); } @@ -3222,7 +3323,7 @@ void PeerManagerImpl::ProcessMessage( // Be shy and don't send version until we hear if (pfrom.IsInboundConn()) { - PushNodeVersion(pfrom); + PushNodeVersion(pfrom, *peer); } if (Params().NetworkIDString() == CBaseChainParams::DEVNET) { @@ -3268,8 +3369,9 @@ void PeerManagerImpl::ProcessMessage( pfrom.m_limited_node = (!(nServices & NODE_NETWORK) && (nServices & NODE_NETWORK_LIMITED)); if (!pfrom.IsBlockOnlyConn()) { - LOCK(pfrom.m_tx_relay->cs_filter); - pfrom.m_tx_relay->fRelayTxes = fRelay; // set to true after we get the first filter* message + LOCK(peer->m_tx_relay->m_bloom_filter_mutex); + peer->m_tx_relay->m_relay_txs = fRelay; // set to true after we get the first filter* message + if (fRelay) pfrom.m_relays_txs = true; } // Potentially mark this peer as a preferred download peer. @@ -3658,7 +3760,7 @@ void PeerManagerImpl::ProcessMessage( MSG_SPORK }; - pfrom.AddKnownInventory(inv.hash); + AddKnownInv(*peer, inv.hash); if (fBlocksOnly && NetMessageViolatesBlocksOnly(inv.GetCommand())) { LogPrint(BCLog::NET, "%s (%s) inv sent in violation of protocol, disconnecting peer=%d\n", inv.GetCommand(), inv.hash.ToString(), pfrom.GetId()); pfrom.fDisconnect = true; @@ -3934,7 +4036,7 @@ void PeerManagerImpl::ProcessMessage( const CTransaction& tx = *ptx; const uint256& txid = ptx->GetHash(); - pfrom.AddKnownInventory(txid); + AddKnownInv(*peer, txid); CInv inv(nInvType, tx.GetHash()); { @@ -4028,11 +4130,11 @@ void PeerManagerImpl::ProcessMessage( for (const uint256& parent_txid : unique_parents) { CInv _inv(MSG_TX, parent_txid); - pfrom.AddKnownInventory(_inv.hash); + AddKnownInv(*peer, _inv.hash); if (!AlreadyHave(_inv)) RequestObject(State(pfrom.GetId()), _inv, current_time, is_masternode); // We don't know if the previous tx was a regular or a mixing one, try both CInv _inv2(MSG_DSTX, parent_txid); - pfrom.AddKnownInventory(_inv2.hash); + AddKnownInv(*peer, _inv2.hash); if (!AlreadyHave(_inv2)) RequestObject(State(pfrom.GetId()), _inv2, current_time, is_masternode); } AddOrphanTx(ptx, pfrom.GetId()); @@ -4490,8 +4592,8 @@ void PeerManagerImpl::ProcessMessage( } if (!pfrom.IsBlockOnlyConn()) { - LOCK(pfrom.m_tx_relay->cs_tx_inventory); - pfrom.m_tx_relay->fSendMempool = true; + LOCK(peer->m_tx_relay->m_tx_inventory_mutex); + peer->m_tx_relay->m_send_mempool = true; } return; } @@ -4585,9 +4687,11 @@ void PeerManagerImpl::ProcessMessage( } else if (!pfrom.IsBlockOnlyConn()) { - LOCK(pfrom.m_tx_relay->cs_filter); - pfrom.m_tx_relay->pfilter.reset(new CBloomFilter(filter)); - pfrom.m_tx_relay->fRelayTxes = true; + LOCK(peer->m_tx_relay->m_bloom_filter_mutex); + peer->m_tx_relay->m_bloom_filter.reset(new CBloomFilter(filter)); + pfrom.m_bloom_filter_loaded = true; + peer->m_tx_relay->m_relay_txs = true; + pfrom.m_relays_txs = true; } return; } @@ -4607,9 +4711,9 @@ void PeerManagerImpl::ProcessMessage( if (vData.size() > MAX_SCRIPT_ELEMENT_SIZE) { bad = true; } else if (!pfrom.IsBlockOnlyConn()) { - LOCK(pfrom.m_tx_relay->cs_filter); - if (pfrom.m_tx_relay->pfilter) { - pfrom.m_tx_relay->pfilter->insert(vData); + LOCK(peer->m_tx_relay->m_bloom_filter_mutex); + if (peer->m_tx_relay->m_bloom_filter) { + peer->m_tx_relay->m_bloom_filter->insert(vData); } else { bad = true; } @@ -4629,9 +4733,11 @@ void PeerManagerImpl::ProcessMessage( if (pfrom.IsBlockOnlyConn()) { return; } - LOCK(pfrom.m_tx_relay->cs_filter); - pfrom.m_tx_relay->pfilter = nullptr; - pfrom.m_tx_relay->fRelayTxes = true; + LOCK(peer->m_tx_relay->m_bloom_filter_mutex); + peer->m_tx_relay->m_bloom_filter = nullptr; + pfrom.m_bloom_filter_loaded = false; + peer->m_tx_relay->m_relay_txs = true; + pfrom.m_relays_txs = true; return; } @@ -4749,7 +4855,7 @@ void PeerManagerImpl::ProcessMessage( ProcessPeerMsgRet(m_llmq_ctx->qdkgsman->ProcessMessage(pfrom, this, is_masternode, msg_type, vRecv), pfrom); ProcessPeerMsgRet(m_llmq_ctx->qman->ProcessMessage(pfrom, msg_type, vRecv), pfrom); m_llmq_ctx->shareman->ProcessMessage(pfrom, m_sporkman, msg_type, vRecv); - ProcessPeerMsgRet(m_llmq_ctx->sigman->ProcessMessage(pfrom, this, msg_type, vRecv), pfrom); + ProcessPeerMsgRet(m_llmq_ctx->sigman->ProcessMessage(pfrom, msg_type, vRecv), pfrom); ProcessPeerMsgRet(m_llmq_ctx->clhandler->ProcessMessage(pfrom, msg_type, vRecv), pfrom); ProcessPeerMsgRet(m_llmq_ctx->isman->ProcessMessage(pfrom, msg_type, vRecv), pfrom); return; @@ -5434,8 +5540,8 @@ bool PeerManagerImpl::SendMessages(CNode* pto) size_t reserve = INVENTORY_BROADCAST_MAX_PER_1MB_BLOCK * MaxBlockSize() / 1000000; if (!pto->IsBlockOnlyConn()) { - LOCK(pto->m_tx_relay->cs_tx_inventory); - reserve = std::min(pto->m_tx_relay->setInventoryTxToSend.size(), reserve); + LOCK(peer->m_tx_relay->m_tx_inventory_mutex); + reserve = std::min(peer->m_tx_relay->m_tx_inventory_to_send.size(), reserve); } reserve = std::max(reserve, peer->m_blocks_for_inv_relay.size()); reserve = std::min(reserve, MAX_INV_SZ); @@ -5451,9 +5557,9 @@ bool PeerManagerImpl::SendMessages(CNode* pto) } peer->m_blocks_for_inv_relay.clear(); - auto queueAndMaybePushInv = [this, pto, &vInv, &msgMaker](const CInv& invIn) { - AssertLockHeld(pto->m_tx_relay->cs_tx_inventory); - pto->m_tx_relay->filterInventoryKnown.insert(invIn.hash); + auto queueAndMaybePushInv = [this, pto, peer, &vInv, &msgMaker](const CInv& invIn) { + AssertLockHeld(peer->m_tx_relay->m_tx_inventory_mutex); + peer->m_tx_relay->m_tx_inventory_known_filter.insert(invIn.hash); LogPrint(BCLog::NET, "SendMessages -- queued inv: %s index=%d peer=%d\n", invIn.ToString(), vInv.size(), pto->GetId()); // Responses to MEMPOOL requests bypass the m_recently_announced_invs filter. vInv.push_back(invIn); @@ -5465,41 +5571,41 @@ bool PeerManagerImpl::SendMessages(CNode* pto) }; if (!pto->IsBlockOnlyConn()) { - LOCK(pto->m_tx_relay->cs_tx_inventory); + LOCK(peer->m_tx_relay->m_tx_inventory_mutex); // Check whether periodic sends should happen // Note: If this node is running in a Masternode mode, it makes no sense to delay outgoing txes // because we never produce any txes ourselves i.e. no privacy is lost in this case. bool fSendTrickle = pto->HasPermission(NetPermissionFlags::NoBan) || is_masternode; - if (pto->m_tx_relay->nNextInvSend < current_time) { + if (peer->m_tx_relay->m_next_inv_send_time < current_time) { fSendTrickle = true; if (pto->IsInboundConn()) { - pto->m_tx_relay->nNextInvSend = m_connman.PoissonNextSendInbound(current_time, INBOUND_INVENTORY_BROADCAST_INTERVAL); + peer->m_tx_relay->m_next_inv_send_time = m_connman.PoissonNextSendInbound(current_time, INBOUND_INVENTORY_BROADCAST_INTERVAL); } else { // Use half the delay for Masternode outbound peers, as there is less privacy concern for them. - pto->m_tx_relay->nNextInvSend = pto->GetVerifiedProRegTxHash().IsNull() ? - PoissonNextSend(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL) : - PoissonNextSend(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL / 2); + peer->m_tx_relay->m_next_inv_send_time = pto->GetVerifiedProRegTxHash().IsNull() ? + PoissonNextSend(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL) : + PoissonNextSend(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL / 2); } } // Time to send but the peer has requested we not relay transactions. if (fSendTrickle) { - LOCK(pto->m_tx_relay->cs_filter); - if (!pto->m_tx_relay->fRelayTxes) pto->m_tx_relay->setInventoryTxToSend.clear(); + LOCK(peer->m_tx_relay->m_bloom_filter_mutex); + if (!peer->m_tx_relay->m_relay_txs) peer->m_tx_relay->m_tx_inventory_to_send.clear(); } // Respond to BIP35 mempool requests - if (fSendTrickle && pto->m_tx_relay->fSendMempool) { + if (fSendTrickle && peer->m_tx_relay->m_send_mempool) { auto vtxinfo = m_mempool.infoAll(); - pto->m_tx_relay->fSendMempool = false; + peer->m_tx_relay->m_send_mempool = false; - LOCK(pto->m_tx_relay->cs_filter); + LOCK(peer->m_tx_relay->m_bloom_filter_mutex); // Send invs for txes and corresponding IS-locks for (const auto& txinfo : vtxinfo) { const uint256& hash = txinfo.tx->GetHash(); - pto->m_tx_relay->setInventoryTxToSend.erase(hash); - if (pto->m_tx_relay->pfilter && !pto->m_tx_relay->pfilter->IsRelevantAndUpdate(*txinfo.tx)) continue; + peer->m_tx_relay->m_tx_inventory_to_send.erase(hash); + if (peer->m_tx_relay->m_bloom_filter && !peer->m_tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue; int nInvType = m_cj_ctx->dstxman->GetDSTX(hash) ? MSG_DSTX : MSG_TX; queueAndMaybePushInv(CInv(nInvType, hash)); @@ -5516,17 +5622,17 @@ bool PeerManagerImpl::SendMessages(CNode* pto) uint256 chainlockHash = ::SerializeHash(clsig); queueAndMaybePushInv(CInv(MSG_CLSIG, chainlockHash)); } - pto->m_tx_relay->m_last_mempool_req = std::chrono::duration_cast(current_time); + peer->m_tx_relay->m_last_mempool_req = std::chrono::duration_cast(current_time); } // Determine transactions to relay if (fSendTrickle) { - LOCK(pto->m_tx_relay->cs_filter); + LOCK(peer->m_tx_relay->m_bloom_filter_mutex); // Produce a vector with all candidates for sending std::vector::iterator> vInvTx; - vInvTx.reserve(pto->m_tx_relay->setInventoryTxToSend.size()); - for (std::set::iterator it = pto->m_tx_relay->setInventoryTxToSend.begin(); it != pto->m_tx_relay->setInventoryTxToSend.end(); it++) { + vInvTx.reserve(peer->m_tx_relay->m_tx_inventory_to_send.size()); + for (std::set::iterator it = peer->m_tx_relay->m_tx_inventory_to_send.begin(); it != peer->m_tx_relay->m_tx_inventory_to_send.end(); it++) { vInvTx.push_back(it); } // Topologically and fee-rate sort the inventory we send for privacy and priority reasons. @@ -5543,9 +5649,9 @@ bool PeerManagerImpl::SendMessages(CNode* pto) vInvTx.pop_back(); uint256 hash = *it; // Remove it from the to-be-sent set - pto->m_tx_relay->setInventoryTxToSend.erase(it); + peer->m_tx_relay->m_tx_inventory_to_send.erase(it); // Check if not in the filter already - if (pto->m_tx_relay->filterInventoryKnown.contains(hash)) { + if (peer->m_tx_relay->m_tx_inventory_known_filter.contains(hash)) { continue; } // Not in the mempool anymore? don't bother sending it. @@ -5553,7 +5659,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) if (!txinfo.tx) { continue; } - if (pto->m_tx_relay->pfilter && !pto->m_tx_relay->pfilter->IsRelevantAndUpdate(*txinfo.tx)) continue; + if (peer->m_tx_relay->m_bloom_filter && !peer->m_tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue; // Send State(pto->GetId())->m_recently_announced_invs.insert(hash); nRelayedTransactions++; @@ -5578,15 +5684,15 @@ bool PeerManagerImpl::SendMessages(CNode* pto) { // Send non-tx/non-block inventory items - LOCK2(pto->m_tx_relay->cs_tx_inventory, pto->m_tx_relay->cs_filter); + LOCK2(peer->m_tx_relay->m_tx_inventory_mutex, peer->m_tx_relay->m_bloom_filter_mutex); - bool fSendIS = pto->m_tx_relay->fRelayTxes && !pto->IsBlockRelayOnly(); + bool fSendIS = peer->m_tx_relay->m_relay_txs && !pto->IsBlockRelayOnly(); - for (const auto& inv : pto->m_tx_relay->vInventoryOtherToSend) { - if (!pto->m_tx_relay->fRelayTxes && NetMessageViolatesBlocksOnly(inv.GetCommand())) { + for (const auto& inv : peer->m_tx_relay->vInventoryOtherToSend) { + if (!peer->m_tx_relay->m_relay_txs && NetMessageViolatesBlocksOnly(inv.GetCommand())) { continue; } - if (pto->m_tx_relay->filterInventoryKnown.contains(inv.hash)) { + if (peer->m_tx_relay->m_tx_inventory_known_filter.contains(inv.hash)) { continue; } if (!fSendIS && inv.type == MSG_ISDLOCK) { @@ -5594,7 +5700,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) } queueAndMaybePushInv(inv); } - pto->m_tx_relay->vInventoryOtherToSend.clear(); + peer->m_tx_relay->vInventoryOtherToSend.clear(); } } if (!vInv.empty()) diff --git a/src/net_processing.h b/src/net_processing.h index f5826dd40d21f..47bccd83bf8f9 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -47,6 +47,7 @@ struct CNodeStateStats { int m_starting_height = -1; std::chrono::microseconds m_ping_wait; std::vector vHeightInFlight; + bool m_relay_txs; uint64_t m_addr_processed = 0; uint64_t m_addr_rate_limited = 0; bool m_addr_relay_enabled{false}; @@ -74,6 +75,12 @@ class PeerManager : public CValidationInterface, public NetEventsInterface /** Send ping message to all peers */ virtual void SendPings() = 0; + /** Is an inventory in the known inventory filter. Used by InstantSend. */ + virtual bool IsInvInFilter(NodeId nodeid, const uint256& hash) const = 0; + + /** Broadcast inventory message to a specific peer. */ + virtual void PushInventory(NodeId nodeid, const CInv& inv) = 0; + /** Relay inventories to all peers */ virtual void RelayInv(CInv &inv, const int minProtoVersion = MIN_PEER_PROTO_VERSION) = 0; virtual void RelayInvFiltered(CInv &inv, const CTransaction &relatedTx, @@ -110,9 +117,6 @@ class PeerManager : public CValidationInterface, public NetEventsInterface virtual bool IsBanned(NodeId pnode) = 0; - /* Can we send addr messages to a peer. Used by InstantSend. */ - virtual bool CanRelayAddrs(NodeId pnode) const = 0; - /** Whether we've completed initial sync yet, for determining when to turn * on extra block-relay-only peers. */ bool m_initial_sync_finished{false}; diff --git a/src/qt/rpcconsole.cpp b/src/qt/rpcconsole.cpp index 069f98e3ec56f..25d06547ec5fc 100644 --- a/src/qt/rpcconsole.cpp +++ b/src/qt/rpcconsole.cpp @@ -1236,7 +1236,6 @@ void RPCConsole::updateDetailWidget() peerAddrDetails += "
" + tr("via %1").arg(QString::fromStdString(stats->nodeStats.addrLocal)); ui->peerHeading->setText(peerAddrDetails); ui->peerServices->setText(GUIUtil::formatServicesStr(stats->nodeStats.nServices)); - ui->peerRelayTxes->setText(stats->nodeStats.fRelayTxes ? "Yes" : "No"); const auto time_now{GetTime()}; ui->peerConnTime->setText(GUIUtil::formatDurationStr(time_now - std::chrono::seconds{stats->nodeStats.nTimeConnected})); ui->peerLastBlock->setText(TimeDurationField(time_now, std::chrono::seconds{stats->nodeStats.nLastBlockTime})); @@ -1295,6 +1294,7 @@ void RPCConsole::updateDetailWidget() ui->peerAddrRelayEnabled->setText(stats->nodeStateStats.m_addr_relay_enabled ? "Yes" : "No"); ui->peerAddrProcessed->setText(QString::number(stats->nodeStateStats.m_addr_processed)); ui->peerAddrRateLimited->setText(QString::number(stats->nodeStateStats.m_addr_rate_limited)); + ui->peerRelayTxes->setText(stats->nodeStateStats.m_relay_txs ? "Yes" : "No"); } ui->detailWidget->show(); diff --git a/src/rpc/net.cpp b/src/rpc/net.cpp index a99003b69702c..b88e9de9c9bcc 100644 --- a/src/rpc/net.cpp +++ b/src/rpc/net.cpp @@ -219,7 +219,6 @@ static RPCHelpMan getpeerinfo() obj.pushKV("verified_pubkey_hash", stats.verifiedPubKeyHash.ToString()); } obj.pushKV("servicesnames", GetServicesNames(stats.nServices)); - obj.pushKV("relaytxes", stats.fRelayTxes); obj.pushKV("lastsend", count_seconds(stats.m_last_send)); obj.pushKV("lastrecv", count_seconds(stats.m_last_recv)); obj.pushKV("last_transaction", stats.nLastTXTime); @@ -258,6 +257,7 @@ static RPCHelpMan getpeerinfo() heights.push_back(height); } obj.pushKV("inflight", heights); + obj.pushKV("relaytxes", statestats.m_relay_txs); obj.pushKV("addr_relay_enabled", statestats.m_addr_relay_enabled); obj.pushKV("addr_processed", statestats.m_addr_processed); obj.pushKV("addr_rate_limited", statestats.m_addr_rate_limited); diff --git a/src/test/fuzz/net.cpp b/src/test/fuzz/net.cpp index 8370ac8aa034c..84d6834af825d 100644 --- a/src/test/fuzz/net.cpp +++ b/src/test/fuzz/net.cpp @@ -60,20 +60,6 @@ FUZZ_TARGET_INIT(net, initialize_net) node.Release(); } }, - [&] { - const std::optional inv_opt = ConsumeDeserializable(fuzzed_data_provider); - if (!inv_opt) { - return; - } - node.AddKnownInventory(inv_opt->hash); - }, - [&] { - const std::optional inv_opt = ConsumeDeserializable(fuzzed_data_provider); - if (!inv_opt) { - return; - } - node.PushInventory(*inv_opt); - }, [&] { const std::optional service_opt = ConsumeDeserializable(fuzzed_data_provider); if (!service_opt) { diff --git a/src/test/fuzz/node_eviction.cpp b/src/test/fuzz/node_eviction.cpp index 70ffc6bf37f25..fc27dbc429817 100644 --- a/src/test/fuzz/node_eviction.cpp +++ b/src/test/fuzz/node_eviction.cpp @@ -26,7 +26,7 @@ FUZZ_TARGET(node_eviction) /* nLastBlockTime */ fuzzed_data_provider.ConsumeIntegral(), /* nLastTXTime */ fuzzed_data_provider.ConsumeIntegral(), /* fRelevantServices */ fuzzed_data_provider.ConsumeBool(), - /* fRelayTxes */ fuzzed_data_provider.ConsumeBool(), + /* m_relay_txs */ fuzzed_data_provider.ConsumeBool(), /* fBloomFilter */ fuzzed_data_provider.ConsumeBool(), /* nKeyedNetGroup */ fuzzed_data_provider.ConsumeIntegral(), /* prefer_evict */ fuzzed_data_provider.ConsumeBool(), diff --git a/src/test/fuzz/util.cpp b/src/test/fuzz/util.cpp index 1251a674e6307..62cc1a281ceff 100644 --- a/src/test/fuzz/util.cpp +++ b/src/test/fuzz/util.cpp @@ -236,10 +236,6 @@ void FillNode(FuzzedDataProvider& fuzzed_data_provider, ConnmanTestMsg& connman, assert(node.nVersion == version); assert(node.GetCommonVersion() == std::min(version, PROTOCOL_VERSION)); assert(node.nServices == remote_services); - if (node.m_tx_relay != nullptr) { - LOCK(node.m_tx_relay->cs_filter); - assert(node.m_tx_relay->fRelayTxes == filter_txs); - } node.m_permissionFlags = permission_flags; if (successfully_connected) { CSerializedNetMsg msg_verack{mm.Make(NetMsgType::VERACK)}; diff --git a/src/test/net_peer_eviction_tests.cpp b/src/test/net_peer_eviction_tests.cpp index 31d391bf7d887..2f2eb29a8bc3f 100644 --- a/src/test/net_peer_eviction_tests.cpp +++ b/src/test/net_peer_eviction_tests.cpp @@ -31,7 +31,7 @@ std::vector GetRandomNodeEvictionCandidates(const int n_c /* nLastBlockTime */ static_cast(random_context.randrange(100)), /* nLastTXTime */ static_cast(random_context.randrange(100)), /* fRelevantServices */ random_context.randbool(), - /* fRelayTxes */ random_context.randbool(), + /* m_relay_txs */ random_context.randbool(), /* fBloomFilter */ random_context.randbool(), /* nKeyedNetGroup */ random_context.randrange(100), /* prefer_evict */ random_context.randbool(), @@ -289,7 +289,7 @@ BOOST_AUTO_TEST_CASE(peer_eviction_test) number_of_nodes, [number_of_nodes](NodeEvictionCandidate& candidate) { candidate.nLastBlockTime = number_of_nodes - candidate.id; if (candidate.id <= 7) { - candidate.fRelayTxes = false; + candidate.m_relay_txs = false; candidate.fRelevantServices = true; } }, @@ -308,7 +308,7 @@ BOOST_AUTO_TEST_CASE(peer_eviction_test) number_of_nodes, [number_of_nodes](NodeEvictionCandidate& candidate) { candidate.nLastBlockTime = number_of_nodes - candidate.id; if (candidate.id <= 7) { - candidate.fRelayTxes = false; + candidate.m_relay_txs = false; candidate.fRelevantServices = true; } }, diff --git a/test/functional/p2p_blocksonly.py b/test/functional/p2p_blocksonly.py index 87197b659b1c6..d23d2c990bc1b 100755 --- a/test/functional/p2p_blocksonly.py +++ b/test/functional/p2p_blocksonly.py @@ -87,7 +87,7 @@ def blocks_relay_conn_tests(self): self.nodes[0].sendrawtransaction(tx_hex) - # Bump time forward to ensure nNextInvSend timer pops + # Bump time forward to ensure m_next_inv_send_time timer pops self.nodes[0].setmocktime(int(time.time()) + 60) conn.sync_send_with_ping() diff --git a/test/functional/wallet_resendwallettransactions.py b/test/functional/wallet_resendwallettransactions.py index 3a8615ac581b8..df7c1ecf6ffc4 100755 --- a/test/functional/wallet_resendwallettransactions.py +++ b/test/functional/wallet_resendwallettransactions.py @@ -40,7 +40,7 @@ def wait_p2p(): return peer_first.tx_invs_received[int(txid, 16)] >= 1 self.wait_until(wait_p2p) - # Add a second peer since txs aren't rebroadcast to the same peer (see filterInventoryKnown) + # Add a second peer since txs aren't rebroadcast to the same peer (see m_tx_inventory_known_filter) peer_second = node.add_p2p_connection(P2PTxInvStore()) self.log.info("Create a block") diff --git a/test/lint/lint-circular-dependencies.sh b/test/lint/lint-circular-dependencies.sh index 7d4ea39a23d05..19f2419b5d6c6 100755 --- a/test/lint/lint-circular-dependencies.sh +++ b/test/lint/lint-circular-dependencies.sh @@ -98,9 +98,10 @@ EXPECTED_CIRCULAR_DEPENDENCIES=( "coinjoin/client -> coinjoin/util -> wallet/wallet -> psbt -> node/transaction -> node/context -> coinjoin/context -> coinjoin/client" "llmq/blockprocessor -> net_processing -> llmq/blockprocessor" "llmq/chainlocks -> net_processing -> llmq/chainlocks" + "llmq/dkgsession -> net_processing -> llmq/quorums -> llmq/dkgsession" "net_processing -> spork -> net_processing" "evo/simplifiedmns -> llmq/blockprocessor -> net_processing -> evo/simplifiedmns" - "governance/governance -> governance/object -> net_processing -> governance/governance" + "governance/governance -> net_processing -> governance/governance" "llmq/blockprocessor -> net_processing -> llmq/context -> llmq/blockprocessor" "llmq/blockprocessor -> net_processing -> llmq/quorums -> llmq/blockprocessor" "llmq/chainlocks -> net_processing -> llmq/context -> llmq/chainlocks"