From 00641815e1095f1b5265f02a5100e9f68606db9f Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Wed, 25 Oct 2023 23:21:20 -0500 Subject: [PATCH 1/3] netsync: Rename NewPeer to PeerConnected. This renames the NewPeer method to PeerConnected to more clearly denote its purpose. It also renames some of the internal plumbing to match. --- internal/netsync/manager.go | 18 +++++++++--------- server.go | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index f95e12ef2e..2edee08f8e 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -92,8 +92,8 @@ const ( // zeroHash is the zero value hash (all zeros). It is defined as a convenience. var zeroHash chainhash.Hash -// newPeerMsg signifies a newly connected peer to the event handler. -type newPeerMsg struct { +// peerConnectedMsg signifies a newly connected peer to the event handler. +type peerConnectedMsg struct { peer *peerpkg.Peer } @@ -602,10 +602,10 @@ func (m *SyncManager) isSyncCandidate(peer *peerpkg.Peer) bool { return peer.Services()&wire.SFNodeNetwork == wire.SFNodeNetwork } -// handleNewPeerMsg deals with new peers that have signalled they may +// handlePeerConnectedMsg deals with new peers that have signalled they may // be considered as a sync peer (they have already successfully negotiated). It // also starts syncing if needed. It is invoked from the syncHandler goroutine. -func (m *SyncManager) handleNewPeerMsg(ctx context.Context, peer *peerpkg.Peer) { +func (m *SyncManager) handlePeerConnectedMsg(ctx context.Context, peer *peerpkg.Peer) { select { case <-ctx.Done(): default: @@ -1438,8 +1438,8 @@ out: select { case data := <-m.msgChan: switch msg := data.(type) { - case *newPeerMsg: - m.handleNewPeerMsg(ctx, msg.peer) + case *peerConnectedMsg: + m.handlePeerConnectedMsg(ctx, msg.peer) case *txMsg: m.handleTxMsg(msg) @@ -1528,10 +1528,10 @@ out: log.Trace("Sync manager event handler done") } -// NewPeer informs the sync manager of a newly active peer. -func (m *SyncManager) NewPeer(peer *peerpkg.Peer) { +// PeerConnected informs the sync manager of a newly active peer. +func (m *SyncManager) PeerConnected(peer *peerpkg.Peer) { select { - case m.msgChan <- &newPeerMsg{peer: peer}: + case m.msgChan <- &peerConnectedMsg{peer: peer}: case <-m.quit: } } diff --git a/server.go b/server.go index 462fa5f58b..d6b06592be 100644 --- a/server.go +++ b/server.go @@ -2314,7 +2314,7 @@ out: // Signal the net sync manager this peer is a new sync candidate // unless it was disconnected above. if p.Connected() { - s.syncManager.NewPeer(p.Peer) + s.syncManager.PeerConnected(p.Peer) p.syncNotifiedMtx.Lock() p.syncNotified = true p.syncNotifiedMtx.Unlock() From a3698da32d647ef8dbad36ebc119f8902fc13f42 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Wed, 25 Oct 2023 23:21:21 -0500 Subject: [PATCH 2/3] netsync: Rename DonePeer to PeerDisconnected. This renames the DonePeer method to PeerDisconnected to more clearly denote its purpose. It also renames some of the internal plumbing to match. --- internal/netsync/manager.go | 20 ++++++++++---------- server.go | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index 2edee08f8e..e0f60cc661 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -126,8 +126,8 @@ type notFoundMsg struct { peer *peerpkg.Peer } -// donePeerMsg signifies a newly disconnected peer to the event handler. -type donePeerMsg struct { +// peerDisconnectedMsg signifies a newly disconnected peer to the event handler. +type peerDisconnectedMsg struct { peer *peerpkg.Peer } @@ -635,11 +635,11 @@ func (m *SyncManager) handlePeerConnectedMsg(ctx context.Context, peer *peerpkg. } } -// handleDonePeerMsg deals with peers that have signalled they are done. It -// removes the peer as a candidate for syncing and in the case where it was +// handlePeerDisconnectedMsg deals with peers that have signalled they are done. +// It removes the peer as a candidate for syncing and in the case where it was // the current sync peer, attempts to select a new best peer to sync from. It // is invoked from the syncHandler goroutine. -func (m *SyncManager) handleDonePeerMsg(p *peerpkg.Peer) { +func (m *SyncManager) handlePeerDisconnectedMsg(p *peerpkg.Peer) { peer := lookupPeer(p, m.peers) if peer == nil { return @@ -1464,8 +1464,8 @@ out: case *notFoundMsg: m.handleNotFoundMsg(msg) - case *donePeerMsg: - m.handleDonePeerMsg(msg.peer) + case *peerDisconnectedMsg: + m.handlePeerDisconnectedMsg(msg.peer) case getSyncPeerMsg: var peerID int32 @@ -1582,10 +1582,10 @@ func (m *SyncManager) QueueNotFound(notFound *wire.MsgNotFound, peer *peerpkg.Pe } } -// DonePeer informs the sync manager that a peer has disconnected. -func (m *SyncManager) DonePeer(peer *peerpkg.Peer) { +// PeerDisconnected informs the sync manager that a peer has disconnected. +func (m *SyncManager) PeerDisconnected(peer *peerpkg.Peer) { select { - case m.msgChan <- &donePeerMsg{peer: peer}: + case m.msgChan <- &peerDisconnectedMsg{peer: peer}: case <-m.quit: } } diff --git a/server.go b/server.go index d6b06592be..c77895fd0d 100644 --- a/server.go +++ b/server.go @@ -2266,7 +2266,7 @@ func (s *server) peerDoneHandler(sp *serverPeer) { syncNotified := sp.syncNotified sp.syncNotifiedMtx.Unlock() if syncNotified { - s.syncManager.DonePeer(sp.Peer) + s.syncManager.PeerDisconnected(sp.Peer) } if sp.VersionKnown() { From 517091c7f6d93f63075ab5ec51a35c2b58ef7458 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Wed, 25 Oct 2023 23:21:23 -0500 Subject: [PATCH 3/3] netsync: Export opaque peer and require it in API. Currently the sync manager maintains additional state per peer in an internal struct that wraps a base/common peer as well as a mapping keyed by that base/common peer. The internal wrapped peer is then queried each time it is needed. This leads to code that is harder to reason about and can fail to lookup the necessary state in some hard to hit corner cases. With that in mind, this modifies the sync manager semantics to instead export the wrapped peer and require the caller to provide that wrapped peer in all of its APIs directly. The server then creates and stores the wrapped peer instance at connection time and passes it to the sync manager. The end result is the code is easier to reason about and resolves the aforementioned hard to hit corner cases since it is no longer possible for the sync manager to ever have access to a peer without the associated extra state. --- internal/netsync/manager.go | 158 ++++++++++++++---------------------- server.go | 47 +++++------ 2 files changed, 80 insertions(+), 125 deletions(-) diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index e0f60cc661..975ee32edf 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -10,7 +10,6 @@ import ( "errors" "fmt" "math" - "runtime/debug" "sync" "time" @@ -94,14 +93,14 @@ var zeroHash chainhash.Hash // peerConnectedMsg signifies a newly connected peer to the event handler. type peerConnectedMsg struct { - peer *peerpkg.Peer + peer *Peer } // blockMsg packages a Decred block message and the peer it came from together // so the event handler has access to that information. type blockMsg struct { block *dcrutil.Block - peer *peerpkg.Peer + peer *Peer reply chan struct{} } @@ -109,33 +108,33 @@ type blockMsg struct { // so the event handler has access to that information. type invMsg struct { inv *wire.MsgInv - peer *peerpkg.Peer + peer *Peer } // headersMsg packages a Decred headers message and the peer it came from // together so the event handler has access to that information. type headersMsg struct { headers *wire.MsgHeaders - peer *peerpkg.Peer + peer *Peer } // notFoundMsg packages a Decred notfound message and the peer it came from // together so the event handler has access to that information. type notFoundMsg struct { notFound *wire.MsgNotFound - peer *peerpkg.Peer + peer *Peer } // peerDisconnectedMsg signifies a newly disconnected peer to the event handler. type peerDisconnectedMsg struct { - peer *peerpkg.Peer + peer *Peer } // txMsg packages a Decred tx message and the peer it came from together // so the event handler has access to that information. type txMsg struct { tx *dcrutil.Tx - peer *peerpkg.Peer + peer *Peer reply chan struct{} } @@ -150,7 +149,7 @@ type getSyncPeerMsg struct { // this through the sync manager so the sync manager doesn't ban the peer // when it sends this information back. type requestFromPeerMsg struct { - peer *peerpkg.Peer + peer *Peer blocks []chainhash.Hash voteHashes []chainhash.Hash tSpendHashes []chainhash.Hash @@ -180,9 +179,10 @@ type processBlockMsg struct { reply chan processBlockResponse } -// syncMgrPeer extends a peer to maintain additional state maintained by the -// sync manager. -type syncMgrPeer struct { +// Peer extends a common peer to maintain additional state needed by the sync +// manager. The internals are intentionally unexported to create an opaque +// type. +type Peer struct { *peerpkg.Peer syncCandidate bool @@ -202,6 +202,18 @@ type syncMgrPeer struct { lastAnnouncedBlock *chainhash.Hash } +// NewPeer returns a new instance of a peer that wraps the provided underlying +// common peer with additional state that is used throughout the package. +func NewPeer(peer *peerpkg.Peer) *Peer { + isSyncCandidate := peer.Services()&wire.SFNodeNetwork == wire.SFNodeNetwork + return &Peer{ + Peer: peer, + syncCandidate: isSyncCandidate, + requestedTxns: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]struct{}), + } +} + // headerSyncState houses the state used to track the header sync progress and // related stall handling. type headerSyncState struct { @@ -283,9 +295,9 @@ type SyncManager struct { requestedTxns map[chainhash.Hash]struct{} requestedBlocks map[chainhash.Hash]struct{} progressLogger *progresslog.Logger - syncPeer *syncMgrPeer + syncPeer *Peer msgChan chan interface{} - peers map[*peerpkg.Peer]*syncMgrPeer + peers map[*Peer]struct{} // hdrSyncState houses the state used to track the initial header sync // process and related stall handling. @@ -323,19 +335,6 @@ type SyncManager struct { nextNeededBlocks []chainhash.Hash } -// lookupPeer returns the sync manager peer that maintains additional state for -// a given base peer. In the event the mapping does not exist, a warning is -// logged and nil is returned. -func lookupPeer(peer *peerpkg.Peer, peers map[*peerpkg.Peer]*syncMgrPeer) *syncMgrPeer { - sp, ok := peers[peer] - if !ok { - log.Warnf("Attempt to lookup unknown peer %s\nStack: %v", peer, - string(debug.Stack())) - return nil - } - return sp -} - // SyncHeight returns latest known block being synced to. func (m *SyncManager) SyncHeight() int64 { m.syncHeightMtx.Lock() @@ -381,7 +380,7 @@ func (m *SyncManager) maybeUpdateNextNeededBlocks() { // fetchNextBlocks creates and sends a request to the provided peer for the next // blocks to be downloaded based on the current headers. -func (m *SyncManager) fetchNextBlocks(peer *syncMgrPeer) { +func (m *SyncManager) fetchNextBlocks(peer *Peer) { // Nothing to do if the target maximum number of blocks to request from the // peer at the same time are already in flight. numInFlight := len(peer.requestedBlocks) @@ -439,8 +438,8 @@ func (m *SyncManager) startSync() { chain := m.cfg.Chain best := chain.BestSnapshot() - var bestPeer *syncMgrPeer - for _, peer := range m.peers { + var bestPeer *Peer + for peer := range m.peers { if !peer.syncCandidate { continue } @@ -549,7 +548,7 @@ func (m *SyncManager) startSync() { // // The request will not be sent more than once or when the peer is in the // process of being removed. -func maybeRequestInitialState(peer *syncMgrPeer) { +func maybeRequestInitialState(peer *Peer) { // Don't request the initial state more than once or when the peer is in the // process of being removed. if peer.initialStateRequested || !peer.Connected() { @@ -589,23 +588,16 @@ func (m *SyncManager) onInitialChainSyncDone() { // Request initial state from all peers that are marked as needing it now // that the initial chain sync is done when enabled. if !m.cfg.NoMiningStateSync { - for _, peer := range m.peers { + for peer := range m.peers { maybeRequestInitialState(peer) } } } -// isSyncCandidate returns whether or not the peer is a candidate to consider -// syncing from. -func (m *SyncManager) isSyncCandidate(peer *peerpkg.Peer) bool { - // The peer is not a candidate for sync if it's not a full node. - return peer.Services()&wire.SFNodeNetwork == wire.SFNodeNetwork -} - // handlePeerConnectedMsg deals with new peers that have signalled they may // be considered as a sync peer (they have already successfully negotiated). It // also starts syncing if needed. It is invoked from the syncHandler goroutine. -func (m *SyncManager) handlePeerConnectedMsg(ctx context.Context, peer *peerpkg.Peer) { +func (m *SyncManager) handlePeerConnectedMsg(ctx context.Context, peer *Peer) { select { case <-ctx.Done(): default: @@ -613,17 +605,10 @@ func (m *SyncManager) handlePeerConnectedMsg(ctx context.Context, peer *peerpkg. log.Infof("New valid peer %s (%s)", peer, peer.UserAgent()) - // Initialize the peer state - isSyncCandidate := m.isSyncCandidate(peer) - m.peers[peer] = &syncMgrPeer{ - Peer: peer, - syncCandidate: isSyncCandidate, - requestedTxns: make(map[chainhash.Hash]struct{}), - requestedBlocks: make(map[chainhash.Hash]struct{}), - } + m.peers[peer] = struct{}{} // Start syncing by choosing the best candidate if needed. - if isSyncCandidate && m.syncPeer == nil { + if peer.syncCandidate && m.syncPeer == nil { m.startSync() } @@ -631,7 +616,7 @@ func (m *SyncManager) handlePeerConnectedMsg(ctx context.Context, peer *peerpkg. // believes the chain is fully synced. Otherwise, it will be requested when // the initial chain sync process is complete. if !m.cfg.NoMiningStateSync && m.IsCurrent() { - maybeRequestInitialState(m.peers[peer]) + maybeRequestInitialState(peer) } } @@ -639,32 +624,27 @@ func (m *SyncManager) handlePeerConnectedMsg(ctx context.Context, peer *peerpkg. // It removes the peer as a candidate for syncing and in the case where it was // the current sync peer, attempts to select a new best peer to sync from. It // is invoked from the syncHandler goroutine. -func (m *SyncManager) handlePeerDisconnectedMsg(p *peerpkg.Peer) { - peer := lookupPeer(p, m.peers) - if peer == nil { - return - } - +func (m *SyncManager) handlePeerDisconnectedMsg(peer *Peer) { // Remove the peer from the list of candidate peers. - delete(m.peers, p) + delete(m.peers, peer) // Re-request in-flight blocks and transactions that were not received // by the disconnected peer if the data was announced by another peer. // Remove the data from the manager's requested data maps if no other // peers have announced the data. - requestQueues := make(map[*peerpkg.Peer][]wire.InvVect) + requestQueues := make(map[*Peer][]wire.InvVect) var inv wire.InvVect inv.Type = wire.InvTypeTx TxHashes: for txHash := range peer.requestedTxns { inv.Hash = txHash - for pp, mgrPeer := range m.peers { + for pp := range m.peers { if !pp.IsKnownInventory(&inv) { continue } invs := append(requestQueues[pp], inv) requestQueues[pp] = invs - mgrPeer.requestedTxns[txHash] = struct{}{} + pp.requestedTxns[txHash] = struct{}{} continue TxHashes } // No peers found that have announced this data. @@ -674,13 +654,13 @@ TxHashes: BlockHashes: for blockHash := range peer.requestedBlocks { inv.Hash = blockHash - for pp, mgrPeer := range m.peers { + for pp := range m.peers { if !pp.IsKnownInventory(&inv) { continue } invs := append(requestQueues[pp], inv) requestQueues[pp] = invs - mgrPeer.requestedBlocks[blockHash] = struct{}{} + pp.requestedBlocks[blockHash] = struct{}{} continue BlockHashes } // No peers found that have announced this data. @@ -720,10 +700,7 @@ BlockHashes: // handleTxMsg handles transaction messages from all peers. func (m *SyncManager) handleTxMsg(tmsg *txMsg) { - peer := lookupPeer(tmsg.peer, m.peers) - if peer == nil { - return - } + peer := tmsg.peer // NOTE: BitcoinJ, and possibly other wallets, don't follow the spec of // sending an inventory message and allowing the remote peer to decide @@ -831,10 +808,7 @@ func (m *SyncManager) processBlock(block *dcrutil.Block) (int64, error) { // handleBlockMsg handles block messages from all peers. func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) { - peer := lookupPeer(bmsg.peer, m.peers) - if peer == nil { - return - } + peer := bmsg.peer // The remote peer is misbehaving when the block was not requested. blockHash := bmsg.block.Hash() @@ -941,7 +915,7 @@ func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) { blockHeight := int64(header.Height) peer.UpdateLastBlockHeight(blockHeight) if onMainChain && m.IsCurrent() { - for _, p := range m.peers { + for p := range m.peers { // The height for the sending peer is already updated. if p == peer { continue @@ -1023,10 +997,7 @@ func (m *SyncManager) headerSyncProgress() float64 { // handleHeadersMsg handles headers messages from all peers. func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) { - peer := lookupPeer(hmsg.peer, m.peers) - if peer == nil { - return - } + peer := hmsg.peer // Nothing to do for an empty headers message as it means the sending peer // does not have any additional headers for the requested block locator. @@ -1262,10 +1233,7 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) { // handleNotFoundMsg handles notfound messages from all peers. func (m *SyncManager) handleNotFoundMsg(nfmsg *notFoundMsg) { - peer := lookupPeer(nfmsg.peer, m.peers) - if peer == nil { - return - } + peer := nfmsg.peer for _, inv := range nfmsg.notFound.InvList { // verify the hash was actually announced by the peer @@ -1311,10 +1279,7 @@ func (m *SyncManager) needTx(hash *chainhash.Hash) bool { // inventory advertised by the remote peer for block and transaction // announcements and acting accordingly. func (m *SyncManager) handleInvMsg(imsg *invMsg) { - peer := lookupPeer(imsg.peer, m.peers) - if peer == nil { - return - } + peer := imsg.peer isCurrent := m.IsCurrent() // Update state information regarding per-peer known inventory and determine @@ -1529,7 +1494,7 @@ out: } // PeerConnected informs the sync manager of a newly active peer. -func (m *SyncManager) PeerConnected(peer *peerpkg.Peer) { +func (m *SyncManager) PeerConnected(peer *Peer) { select { case m.msgChan <- &peerConnectedMsg{peer: peer}: case <-m.quit: @@ -1538,7 +1503,7 @@ func (m *SyncManager) PeerConnected(peer *peerpkg.Peer) { // QueueTx adds the passed transaction message and peer to the event handling // queue. -func (m *SyncManager) QueueTx(tx *dcrutil.Tx, peer *peerpkg.Peer, done chan struct{}) { +func (m *SyncManager) QueueTx(tx *dcrutil.Tx, peer *Peer, done chan struct{}) { select { case m.msgChan <- &txMsg{tx: tx, peer: peer, reply: done}: case <-m.quit: @@ -1548,7 +1513,7 @@ func (m *SyncManager) QueueTx(tx *dcrutil.Tx, peer *peerpkg.Peer, done chan stru // QueueBlock adds the passed block message and peer to the event handling // queue. -func (m *SyncManager) QueueBlock(block *dcrutil.Block, peer *peerpkg.Peer, done chan struct{}) { +func (m *SyncManager) QueueBlock(block *dcrutil.Block, peer *Peer, done chan struct{}) { select { case m.msgChan <- &blockMsg{block: block, peer: peer, reply: done}: case <-m.quit: @@ -1557,7 +1522,7 @@ func (m *SyncManager) QueueBlock(block *dcrutil.Block, peer *peerpkg.Peer, done } // QueueInv adds the passed inv message and peer to the event handling queue. -func (m *SyncManager) QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) { +func (m *SyncManager) QueueInv(inv *wire.MsgInv, peer *Peer) { select { case m.msgChan <- &invMsg{inv: inv, peer: peer}: case <-m.quit: @@ -1566,7 +1531,7 @@ func (m *SyncManager) QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) { // QueueHeaders adds the passed headers message and peer to the event handling // queue. -func (m *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer) { +func (m *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer *Peer) { select { case m.msgChan <- &headersMsg{headers: headers, peer: peer}: case <-m.quit: @@ -1575,7 +1540,7 @@ func (m *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer) // QueueNotFound adds the passed notfound message and peer to the event handling // queue. -func (m *SyncManager) QueueNotFound(notFound *wire.MsgNotFound, peer *peerpkg.Peer) { +func (m *SyncManager) QueueNotFound(notFound *wire.MsgNotFound, peer *Peer) { select { case m.msgChan <- ¬FoundMsg{notFound: notFound, peer: peer}: case <-m.quit: @@ -1583,7 +1548,7 @@ func (m *SyncManager) QueueNotFound(notFound *wire.MsgNotFound, peer *peerpkg.Pe } // PeerDisconnected informs the sync manager that a peer has disconnected. -func (m *SyncManager) PeerDisconnected(peer *peerpkg.Peer) { +func (m *SyncManager) PeerDisconnected(peer *Peer) { select { case m.msgChan <- &peerDisconnectedMsg{peer: peer}: case <-m.quit: @@ -1609,7 +1574,7 @@ func (m *SyncManager) SyncPeerID() int32 { // RequestFromPeer allows an outside caller to request blocks or transactions // from a peer. The requests are logged in the internal map of requests so the // peer is not later banned for sending the respective data. -func (m *SyncManager) RequestFromPeer(p *peerpkg.Peer, blocks, voteHashes, +func (m *SyncManager) RequestFromPeer(p *Peer, blocks, voteHashes, tSpendHashes []chainhash.Hash) error { reply := make(chan requestFromPeerResponse, 1) @@ -1633,14 +1598,9 @@ func (m *SyncManager) RequestFromPeer(p *peerpkg.Peer, blocks, voteHashes, } } -func (m *SyncManager) requestFromPeer(p *peerpkg.Peer, blocks, voteHashes, +func (m *SyncManager) requestFromPeer(peer *Peer, blocks, voteHashes, tSpendHashes []chainhash.Hash) error { - peer := lookupPeer(p, m.peers) - if peer == nil { - return fmt.Errorf("unknown peer %s", p) - } - // Add the blocks to the request. msgResp := wire.NewMsgGetData() for i := range blocks { @@ -1747,7 +1707,7 @@ func (m *SyncManager) requestFromPeer(p *peerpkg.Peer, blocks, voteHashes, } if len(msgResp.InvList) > 0 { - p.QueueMessage(msgResp, nil) + peer.QueueMessage(msgResp, nil) } return nil @@ -1863,7 +1823,7 @@ func New(config *Config) *SyncManager { rejectedTxns: apbf.NewFilter(maxRejectedTxns, rejectedTxnsFPRate), requestedTxns: make(map[chainhash.Hash]struct{}), requestedBlocks: make(map[chainhash.Hash]struct{}), - peers: make(map[*peerpkg.Peer]*syncMgrPeer), + peers: make(map[*Peer]struct{}), minKnownWork: minKnownWork, hdrSyncState: makeHeaderSyncState(), progressLogger: progresslog.New("Processed", log), diff --git a/server.go b/server.go index c77895fd0d..2633bd97c4 100644 --- a/server.go +++ b/server.go @@ -535,6 +535,10 @@ type serverPeer struct { banScore connmgr.DynamicBanScore quit chan struct{} + // syncMgrPeer houses the network sync manager peer instance that wraps the + // underlying peer similar to the way this server peer itself wraps it. + syncMgrPeer *netsync.Peer + // addrsSent, getMiningStateSent and initState all track whether or not // the peer has already sent the respective request. It is used to // prevent more than one response per connection. @@ -544,10 +548,8 @@ type serverPeer struct { // The following fields are used to synchronize the net sync manager and // server. - syncNotifiedMtx sync.Mutex - syncNotified bool - txProcessed chan struct{} - blockProcessed chan struct{} + txProcessed chan struct{} + blockProcessed chan struct{} // peerNa is network address of the peer connected to. peerNa *wire.NetAddress @@ -947,7 +949,7 @@ func (sp *serverPeer) OnMiningState(_ *peer.Peer, msg *wire.MsgMiningState) { } } - err := sp.server.syncManager.RequestFromPeer(sp.Peer, blockHashes, + err := sp.server.syncManager.RequestFromPeer(sp.syncMgrPeer, blockHashes, voteHashes, nil) if err != nil { peerLog.Warnf("couldn't handle mining state message: %v", @@ -1034,8 +1036,8 @@ func (sp *serverPeer) OnGetInitState(_ *peer.Peer, msg *wire.MsgGetInitState) { // OnInitState is invoked when a peer receives a initstate wire message. It // requests the data advertised in the message from the peer. func (sp *serverPeer) OnInitState(_ *peer.Peer, msg *wire.MsgInitState) { - err := sp.server.syncManager.RequestFromPeer(sp.Peer, msg.BlockHashes, - msg.VoteHashes, msg.TSpendHashes) + err := sp.server.syncManager.RequestFromPeer(sp.syncMgrPeer, + msg.BlockHashes, msg.VoteHashes, msg.TSpendHashes) if err != nil { peerLog.Warnf("couldn't handle init state message: %v", err) } @@ -1064,7 +1066,7 @@ func (sp *serverPeer) OnTx(_ *peer.Peer, msg *wire.MsgTx) { // processed and known good or bad. This helps prevent a malicious peer // from queuing up a bunch of bad transactions before disconnecting (or // being disconnected) and wasting memory. - sp.server.syncManager.QueueTx(tx, sp.Peer, sp.txProcessed) + sp.server.syncManager.QueueTx(tx, sp.syncMgrPeer, sp.txProcessed) <-sp.txProcessed } @@ -1087,7 +1089,7 @@ func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) { // depended on by at least the block acceptance test tool as the reference // implementation processes blocks in the same thread and therefore blocks // further messages until the network block has been fully processed. - sp.server.syncManager.QueueBlock(block, sp.Peer, sp.blockProcessed) + sp.server.syncManager.QueueBlock(block, sp.syncMgrPeer, sp.blockProcessed) <-sp.blockProcessed } @@ -1096,14 +1098,14 @@ func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) { // accordingly. We pass the message down to the net sync manager which will // call QueueMessage with any appropriate responses. func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) { - // Ban peers sending empty inventory requests. + // Ban peers sending empty inventory announcements. if len(msg.InvList) == 0 { sp.server.BanPeer(sp) return } if !cfg.BlocksOnly { - sp.server.syncManager.QueueInv(msg, sp.Peer) + sp.server.syncManager.QueueInv(msg, sp.syncMgrPeer) return } @@ -1122,13 +1124,13 @@ func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) { } } - sp.server.syncManager.QueueInv(newInv, sp.Peer) + sp.server.syncManager.QueueInv(newInv, sp.syncMgrPeer) } // OnHeaders is invoked when a peer receives a headers wire message. The // message is passed down to the net sync manager. func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) { - sp.server.syncManager.QueueHeaders(msg, sp.Peer) + sp.server.syncManager.QueueHeaders(msg, sp.syncMgrPeer) } // handleGetData is invoked when a peer receives a getdata wire message and is @@ -1483,7 +1485,7 @@ func (sp *serverPeer) OnNotFound(_ *peer.Peer, msg *wire.MsgNotFound) { return } } - sp.server.syncManager.QueueNotFound(msg, sp.Peer) + sp.server.syncManager.QueueNotFound(msg, sp.syncMgrPeer) } // randomUint16Number returns a random uint16 in a specified input range. Note @@ -2231,6 +2233,7 @@ func (s *server) inboundPeerConnected(conn net.Conn) { sp := newServerPeer(s, false) sp.isWhitelisted = isWhitelisted(conn.RemoteAddr()) sp.Peer = peer.NewInboundPeer(newPeerConfig(sp)) + sp.syncMgrPeer = netsync.NewPeer(sp.Peer) sp.AssociateConnection(conn) go s.peerDoneHandler(sp) } @@ -2248,6 +2251,7 @@ func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) { return } sp.Peer = p + sp.syncMgrPeer = netsync.NewPeer(sp.Peer) sp.connReq = c sp.isWhitelisted = isWhitelisted(conn.RemoteAddr()) sp.AssociateConnection(conn) @@ -2260,14 +2264,8 @@ func (s *server) peerDoneHandler(sp *serverPeer) { sp.WaitForDisconnect() s.DonePeer(sp) - // Notify the net sync manager the peer is gone if it was ever notified that - // the peer existed. - sp.syncNotifiedMtx.Lock() - syncNotified := sp.syncNotified - sp.syncNotifiedMtx.Unlock() - if syncNotified { - s.syncManager.PeerDisconnected(sp.Peer) - } + // Notify the net sync manager the peer is gone. + s.syncManager.PeerDisconnected(sp.syncMgrPeer) if sp.VersionKnown() { // Evict any remaining orphans that were sent by the peer. @@ -2314,10 +2312,7 @@ out: // Signal the net sync manager this peer is a new sync candidate // unless it was disconnected above. if p.Connected() { - s.syncManager.PeerConnected(p.Peer) - p.syncNotifiedMtx.Lock() - p.syncNotified = true - p.syncNotifiedMtx.Unlock() + s.syncManager.PeerConnected(p.syncMgrPeer) } // Disconnected peers.