Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport: merge bitcoin#21167, #22782, #21943, #22829, #24079, #24108, #24157, #25109 (network backports: part 5) #6004

Merged
merged 8 commits into from
May 10, 2024
10 changes: 5 additions & 5 deletions src/checkqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class CCheckQueue
bool m_request_stop GUARDED_BY(m_mutex){false};

/** Internal function that does bulk of the verification work. */
bool Loop(bool fMaster)
bool Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
std::vector<T> vChecks;
Expand Down Expand Up @@ -139,7 +139,7 @@ class CCheckQueue
}

//! Create a pool of new worker threads.
void StartWorkerThreads(const int threads_num)
void StartWorkerThreads(const int threads_num) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
{
LOCK(m_mutex);
Expand All @@ -157,13 +157,13 @@ class CCheckQueue
}

//! Wait until execution finishes, and return whether all evaluations were successful.
bool Wait()
bool Wait() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
return Loop(true /* master thread */);
}

//! Add a batch of checks to the queue
void Add(std::vector<T>& vChecks)
void Add(std::vector<T>& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
if (vChecks.empty()) {
return;
Expand All @@ -186,7 +186,7 @@ class CCheckQueue
}

//! Stop all of the worker threads.
void StopWorkerThreads()
void StopWorkerThreads() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
WITH_LOCK(m_mutex, m_request_stop = true);
m_worker_cv.notify_all();
Expand Down
17 changes: 12 additions & 5 deletions src/coinjoin/coinjoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,15 +368,22 @@ class CDSTXManager
void AddDSTX(const CCoinJoinBroadcastTx& dstx) EXCLUSIVE_LOCKS_REQUIRED(!cs_mapdstx);
CCoinJoinBroadcastTx GetDSTX(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(!cs_mapdstx);

void UpdatedBlockTip(const CBlockIndex* pindex, const llmq::CChainLocksHandler& clhandler, const CMasternodeSync& mn_sync);
void NotifyChainLock(const CBlockIndex* pindex, const llmq::CChainLocksHandler& clhandler, const CMasternodeSync& mn_sync);
void UpdatedBlockTip(const CBlockIndex* pindex, const llmq::CChainLocksHandler& clhandler,
const CMasternodeSync& mn_sync)
EXCLUSIVE_LOCKS_REQUIRED(!cs_mapdstx);
void NotifyChainLock(const CBlockIndex* pindex, const llmq::CChainLocksHandler& clhandler,
const CMasternodeSync& mn_sync)
EXCLUSIVE_LOCKS_REQUIRED(!cs_mapdstx);

void TransactionAddedToMempool(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(!cs_mapdstx);
void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex) EXCLUSIVE_LOCKS_REQUIRED(!cs_mapdstx);
void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex*) EXCLUSIVE_LOCKS_REQUIRED(!cs_mapdstx);
void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex)
EXCLUSIVE_LOCKS_REQUIRED(!cs_mapdstx);
void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex*)
EXCLUSIVE_LOCKS_REQUIRED(!cs_mapdstx);

private:
void CheckDSTXes(const CBlockIndex* pindex, const llmq::CChainLocksHandler& clhandler);
void CheckDSTXes(const CBlockIndex* pindex, const llmq::CChainLocksHandler& clhandler)
EXCLUSIVE_LOCKS_REQUIRED(!cs_mapdstx);
void UpdateDSTXConfirmedHeight(const CTransactionRef& tx, std::optional<int> nHeight);

};
Expand Down
12 changes: 5 additions & 7 deletions src/governance/governance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1223,11 +1223,11 @@ void CGovernanceManager::RequestGovernanceObject(CNode* pfrom, const uint256& nH

int CGovernanceManager::RequestGovernanceObjectVotes(CNode& peer, CConnman& connman) const
{
std::array<CNode*, 1> nodeCopy{&peer};
return RequestGovernanceObjectVotes(nodeCopy, connman);
const std::vector<CNode*> vNodeCopy{&peer};
return RequestGovernanceObjectVotes(vNodeCopy, connman);
}

int CGovernanceManager::RequestGovernanceObjectVotes(Span<CNode*> vNodesCopy, CConnman& connman) const
int CGovernanceManager::RequestGovernanceObjectVotes(const std::vector<CNode*>& vNodesCopy, CConnman& connman) const
{
static std::map<uint256, std::map<CService, int64_t> > mapAskedRecently;

Expand Down Expand Up @@ -1501,7 +1501,7 @@ void CGovernanceManager::UpdatedBlockTip(const CBlockIndex* pindex, CConnman& co

void CGovernanceManager::RequestOrphanObjects(CConnman& connman)
{
std::vector<CNode*> vNodesCopy = connman.CopyNodeVector(CConnman::FullyConnectedOnly);
const CConnman::NodesSnapshot snap{connman, /* filter = */ CConnman::FullyConnectedOnly};

std::vector<uint256> vecHashesFiltered;
{
Expand All @@ -1517,15 +1517,13 @@ void CGovernanceManager::RequestOrphanObjects(CConnman& connman)

LogPrint(BCLog::GOBJECT, "CGovernanceObject::RequestOrphanObjects -- number objects = %d\n", vecHashesFiltered.size());
for (const uint256& nHash : vecHashesFiltered) {
for (CNode* pnode : vNodesCopy) {
for (CNode* pnode : snap.Nodes()) {
if (!pnode->CanRelay()) {
continue;
}
RequestGovernanceObject(pnode, nHash, connman);
}
}

connman.ReleaseNodeVector(vNodesCopy);
}

void CGovernanceManager::CleanOrphanObjects()
Expand Down
2 changes: 1 addition & 1 deletion src/governance/governance.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ class CGovernanceManager : public GovernanceStore
void InitOnLoad();

int RequestGovernanceObjectVotes(CNode& peer, CConnman& connman) const;
int RequestGovernanceObjectVotes(Span<CNode*> vNodesCopy, CConnman& connman) const;
int RequestGovernanceObjectVotes(const std::vector<CNode*>& vNodesCopy, CConnman& connman) const;

/*
* Trigger Management (formerly CGovernanceTriggerManager)
Expand Down
6 changes: 3 additions & 3 deletions src/httpserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class WorkQueue
{
}
/** Enqueue a work item */
bool Enqueue(WorkItem* item)
bool Enqueue(WorkItem* item) EXCLUSIVE_LOCKS_REQUIRED(!cs)
{
LOCK(cs);
if (!running || queue.size() >= maxDepth) {
Expand All @@ -94,7 +94,7 @@ class WorkQueue
return true;
}
/** Thread function */
void Run()
void Run() EXCLUSIVE_LOCKS_REQUIRED(!cs)
{
while (true) {
std::unique_ptr<WorkItem> i;
Expand All @@ -111,7 +111,7 @@ class WorkQueue
}
}
/** Interrupt and exit loops */
void Interrupt()
void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!cs)
{
LOCK(cs);
running = false;
Expand Down
6 changes: 3 additions & 3 deletions src/i2p.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class Session
* to the listening socket and address.
* @return true on success
*/
bool Listen(Connection& conn);
bool Listen(Connection& conn) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);

/**
* Wait for and accept a new incoming connection.
Expand All @@ -103,7 +103,7 @@ class Session
* it is set to `false`. Only set if `false` is returned.
* @return true on success
*/
bool Connect(const CService& to, Connection& conn, bool& proxy_error);
bool Connect(const CService& to, Connection& conn, bool& proxy_error) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);

private:
/**
Expand Down Expand Up @@ -172,7 +172,7 @@ class Session
/**
* Check the control socket for errors and possibly disconnect.
*/
void CheckControlSock();
void CheckControlSock() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);

/**
* Generate a new destination with the SAM proxy and set `m_private_key` to it.
Expand Down
2 changes: 1 addition & 1 deletion src/index/blockfilterindex.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class BlockFilterIndex final : public BaseIndex
bool LookupFilter(const CBlockIndex* block_index, BlockFilter& filter_out) const;

/** Get a single filter header by block. */
bool LookupFilterHeader(const CBlockIndex* block_index, uint256& header_out);
bool LookupFilterHeader(const CBlockIndex* block_index, uint256& header_out) EXCLUSIVE_LOCKS_REQUIRED(!m_cs_headers_cache);

/** Get a range of filters between two heights on a chain. */
bool LookupFilterRange(int start_height, const CBlockIndex* stop_index,
Expand Down
2 changes: 1 addition & 1 deletion src/llmq/dkgsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1336,7 +1336,7 @@ void CDKGSession::RelayInvToParticipants(const CInv& inv) const
if (pnode->GetVerifiedProRegTxHash().IsNull()) {
logger.Batch("node[%d:%s] not mn",
pnode->GetId(),
pnode->GetAddrName());
pnode->m_addr_name);
} else if (relayMembers.count(pnode->GetVerifiedProRegTxHash()) == 0) {
ss2 << pnode->GetVerifiedProRegTxHash().ToString().substr(0, 4) << " | ";
}
Expand Down
10 changes: 3 additions & 7 deletions src/llmq/signing_shares.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1092,14 +1092,13 @@ bool CSigSharesManager::SendMessages()
return session->sendSessionId;
};

std::vector<CNode*> vNodesCopy = connman.CopyNodeVector(CConnman::FullyConnectedOnly);

const CConnman::NodesSnapshot snap{connman, /* filter = */ CConnman::FullyConnectedOnly};
{
LOCK(cs);
CollectSigSharesToRequest(sigSharesToRequest);
CollectSigSharesToSend(sigShareBatchesToSend);
CollectSigSharesToAnnounce(sigSharesToAnnounce);
CollectSigSharesToSendConcentrated(sigSharesToSend, vNodesCopy);
CollectSigSharesToSendConcentrated(sigSharesToSend, snap.Nodes());

for (auto& [nodeId, sigShareMap] : sigSharesToRequest) {
for (auto& [hash, sigShareInv] : sigShareMap) {
Expand All @@ -1120,7 +1119,7 @@ bool CSigSharesManager::SendMessages()

bool didSend = false;

for (auto& pnode : vNodesCopy) {
for (auto& pnode : snap.Nodes()) {
CNetMsgMaker msgMaker(pnode->GetCommonVersion());

if (const auto it1 = sigSessionAnnouncements.find(pnode->GetId()); it1 != sigSessionAnnouncements.end()) {
Expand Down Expand Up @@ -1222,9 +1221,6 @@ bool CSigSharesManager::SendMessages()
}
}

// looped through all nodes, release them
connman.ReleaseNodeVector(vNodesCopy);

return didSend;
}

Expand Down
20 changes: 6 additions & 14 deletions src/masternode/sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,11 @@ void CMasternodeSync::ProcessTick()
}

nTimeLastProcess = GetTime();
std::vector<CNode*> vNodesCopy = connman.CopyNodeVector(CConnman::FullyConnectedOnly);
const CConnman::NodesSnapshot snap{connman, /* filter = */ CConnman::FullyConnectedOnly};

// gradually request the rest of the votes after sync finished
if(IsSynced()) {
m_govman.RequestGovernanceObjectVotes(vNodesCopy, connman);
connman.ReleaseNodeVector(vNodesCopy);
m_govman.RequestGovernanceObjectVotes(snap.Nodes(), connman);
return;
}

Expand All @@ -154,7 +153,7 @@ void CMasternodeSync::ProcessTick()
LogPrint(BCLog::MNSYNC, "CMasternodeSync::ProcessTick -- nTick %d nCurrentAsset %d nTriedPeerCount %d nSyncProgress %f\n", nTick, nCurrentAsset, nTriedPeerCount, nSyncProgress);
uiInterface.NotifyAdditionalDataSyncProgressChanged(nSyncProgress);

for (auto& pnode : vNodesCopy)
for (auto& pnode : snap.Nodes())
{
CNetMsgMaker msgMaker(pnode->GetCommonVersion());

Expand Down Expand Up @@ -189,7 +188,7 @@ void CMasternodeSync::ProcessTick()
}

if (nCurrentAsset == MASTERNODE_SYNC_BLOCKCHAIN) {
int64_t nTimeSyncTimeout = vNodesCopy.size() > 3 ? MASTERNODE_SYNC_TICK_SECONDS : MASTERNODE_SYNC_TIMEOUT_SECONDS;
int64_t nTimeSyncTimeout = snap.Nodes().size() > 3 ? MASTERNODE_SYNC_TICK_SECONDS : MASTERNODE_SYNC_TIMEOUT_SECONDS;
if (fReachedBestHeader && (GetTime() - nTimeLastBumped > nTimeSyncTimeout)) {
// At this point we know that:
// a) there are peers (because we are looping on at least one of them);
Expand All @@ -205,7 +204,7 @@ void CMasternodeSync::ProcessTick()

if (gArgs.GetBoolArg("-syncmempool", DEFAULT_SYNC_MEMPOOL)) {
// Now that the blockchain is synced request the mempool from the connected outbound nodes if possible
for (auto pNodeTmp : vNodesCopy) {
for (auto pNodeTmp : snap.Nodes()) {
bool fRequestedEarlier = m_netfulfilledman.HasFulfilledRequest(pNodeTmp->addr, "mempool-sync");
if (pNodeTmp->nVersion >= 70216 && !pNodeTmp->IsInboundConn() && !fRequestedEarlier && !pNodeTmp->IsBlockRelayOnly()) {
m_netfulfilledman.AddFulfilledRequest(pNodeTmp->addr, "mempool-sync");
Expand All @@ -222,7 +221,6 @@ void CMasternodeSync::ProcessTick()
if(nCurrentAsset == MASTERNODE_SYNC_GOVERNANCE) {
if (!m_govman.IsValid()) {
SwitchToNextAsset();
connman.ReleaseNodeVector(vNodesCopy);
return;
}
LogPrint(BCLog::GOBJECT, "CMasternodeSync::ProcessTick -- nTick %d nCurrentAsset %d nTimeLastBumped %lld GetTime() %lld diff %lld\n", nTick, nCurrentAsset, nTimeLastBumped, GetTime(), GetTime() - nTimeLastBumped);
Expand All @@ -235,7 +233,6 @@ void CMasternodeSync::ProcessTick()
// it's kind of ok to skip this for now, hopefully we'll catch up later?
}
SwitchToNextAsset();
connman.ReleaseNodeVector(vNodesCopy);
return;
}

Expand All @@ -259,12 +256,11 @@ void CMasternodeSync::ProcessTick()

if (nCurrentAsset != MASTERNODE_SYNC_GOVERNANCE) {
// looped through all nodes and not syncing governance yet/already, release them
connman.ReleaseNodeVector(vNodesCopy);
return;
}

// request votes on per-obj basis from each node
for (const auto& pnode : vNodesCopy) {
for (const auto& pnode : snap.Nodes()) {
if(!m_netfulfilledman.HasFulfilledRequest(pnode->addr, "governance-sync")) {
continue; // to early for this node
}
Expand All @@ -291,16 +287,12 @@ void CMasternodeSync::ProcessTick()
// reset nTimeNoObjectsLeft to be able to use the same condition on resync
nTimeNoObjectsLeft = 0;
SwitchToNextAsset();
connman.ReleaseNodeVector(vNodesCopy);
return;
}
nLastTick = nTick;
nLastVotes = m_govman.GetVoteCount();
}
}

// looped through all nodes, release them
connman.ReleaseNodeVector(vNodesCopy);
}

void CMasternodeSync::SendGovernanceSyncRequest(CNode* pnode) const
Expand Down
Loading
Loading