diff --git a/server.go b/server.go index dfb346a597..7a073177e5 100644 --- a/server.go +++ b/server.go @@ -94,6 +94,29 @@ const ( // submissions cached. maxCachedNaSubmissions = 20 + // These constants control the maximum number of simultaneous pending + // getdata messages and the individual data item requests they make without + // being disconnected. + // + // Since each getdata message is comprised of several individual data item + // requests, the limiting is applied on both dimensions to offer more + // flexibility while still keeping memory usage bounded to reasonable + // limits. + // + // maxConcurrentGetDataReqs is the maximum number of simultaneous pending + // getdata message requests. + // + // maxPendingGetDataItemReqs is the maximum number of overall total + // simultaneous pending individual data item requests. + // + // In other words, when combined, a peer may mix and match simultaneous + // getdata requests for varying amounts of data items so long as it does not + // exceed the maximum specified number of simultaneous pending getdata + // messages or the maximum number of total overall pending individual data + // item requests. + maxConcurrentGetDataReqs = 1000 + maxPendingGetDataItemReqs = 2 * wire.MaxInvPerMsg + // maxReorgDepthNotify specifies the maximum reorganization depth for which // winning ticket notifications will be sent over RPC. The reorg depth is // the number of blocks that would be reorganized out of the current best @@ -527,7 +550,7 @@ type serverPeer struct { connReq *connmgr.ConnReq server *server persistent bool - continueHash *chainhash.Hash + continueHash atomic.Pointer[chainhash.Hash] relayMtx sync.Mutex disableRelayTx bool isWhitelisted bool @@ -558,6 +581,17 @@ type serverPeer struct { // announcedBlock tracks the most recent block announced to this peer and is // used to filter duplicates. announcedBlock *chainhash.Hash + + // The following fields are used to serve getdata requests asynchronously as + // opposed to directly in the peer input handler. + // + // getDataQueue is a buffered channel for queueing up concurrent getdata + // requests. + // + // numPendingGetDataItemReqs tracks the total number of pending individual + // data item requests that still need to be served. + getDataQueue chan []*wire.InvVect + numPendingGetDataItemReqs atomic.Uint32 } // newServerPeer returns a new serverPeer instance. The peer needs to be set by @@ -570,9 +604,188 @@ func newServerPeer(s *server, isPersistent bool) *serverPeer { quit: make(chan struct{}), txProcessed: make(chan struct{}, 1), blockProcessed: make(chan struct{}, 1), + getDataQueue: make(chan []*wire.InvVect, maxConcurrentGetDataReqs), } } +// handleServeGetData is the primary logic for servicing queued getdata +// requests. +// +// It makes use of the given send done channel and semaphore to provide +// a little pipelining of database loads while keeping the memory usage bounded +// to reasonable limits. +// +// It is invoked from the serveGetData goroutine. +func (sp *serverPeer) handleServeGetData(invVects []*wire.InvVect, + sendDoneChan chan struct{}, semaphore chan struct{}) { + + var notFoundMsg *wire.MsgNotFound + for _, iv := range invVects { + var sendInv bool + var dataMsg wire.Message + switch iv.Type { + case wire.InvTypeTx: + // Attempt to fetch the requested transaction from the pool. A call + // could be made to check for existence first, but simply trying to + // fetch a missing transaction results in the same behavior. Do not + // allow peers to request transactions already in a block but are + // unconfirmed, as they may be expensive. Restrict that to the + // authenticated RPC only. + txHash := &iv.Hash + tx, err := sp.server.txMemPool.FetchTransaction(txHash) + if err != nil { + peerLog.Tracef("Unable to fetch tx %v from transaction pool: %v", + txHash, err) + break + } + dataMsg = tx.MsgTx() + + case wire.InvTypeBlock: + blockHash := &iv.Hash + block, err := sp.server.chain.BlockByHash(blockHash) + if err != nil { + peerLog.Tracef("Unable to fetch requested block hash %v: %v", + blockHash, err) + break + } + dataMsg = block.MsgBlock() + + // When the peer requests the final block that was advertised in + // response to a getblocks message which requested more blocks than + // would fit into a single message, it requires a new inventory + // message to trigger it to issue another getblocks message for the + // next batch of inventory. + // + // However, that inventory message should not be sent until after + // the block itself is sent, so keep a flag for later use. + // + // Note that this is to support the legacy syncing model that is no + // longer used in dcrd which is now based on a much more robust + // headers-based syncing model. Nevertheless, this behavior is + // still a required part of the getblocks protocol semantics. It + // can be removed if a future protocol upgrade also removes the + // getblocks message. + continueHash := sp.continueHash.Load() + sendInv = continueHash != nil && continueHash == blockHash + + default: + peerLog.Warnf("Unknown type '%d' in inventory request from %s", + iv.Type, sp) + continue + } + if dataMsg == nil { + if notFoundMsg == nil { + notFoundMsg = wire.NewMsgNotFound() + } + notFoundMsg.AddInvVect(iv) + } + + // Limit the number of items that can be queued to prevent wasting a + // bunch of memory by queuing far more data than can be sent in a + // reasonable time. The waiting occurs after the database fetch for the + // next one to provide a little pipelining. + // + // This also monitors the channel that is notified when queued messages + // are sent in order to release the semaphore without needing a separate + // monitoring goroutine. + for semAcquired := false; !semAcquired; { + select { + case <-sp.quit: + return + + case semaphore <- struct{}{}: + semAcquired = true + + case <-sendDoneChan: + // Release semaphore. + <-semaphore + } + } + + // Decrement the pending data item requests accordingly and queue the + // data to be sent to the peer. + sp.numPendingGetDataItemReqs.Add(^uint32(0)) + sp.QueueMessage(dataMsg, sendDoneChan) + + // Send a new inventory message to trigger the peer to issue another + // getblocks message for the next batch of inventory if needed. + if sendInv { + best := sp.server.chain.BestSnapshot() + invMsg := wire.NewMsgInvSizeHint(1) + iv := wire.NewInvVect(wire.InvTypeBlock, &best.Hash) + invMsg.AddInvVect(iv) + sp.QueueMessage(invMsg, nil) + sp.continueHash.Store(nil) + } + } + if notFoundMsg != nil { + sp.QueueMessage(notFoundMsg, nil) + } +} + +// serveGetData provides an asynchronous queue that services all data requested +// via getdata requests such that the peer may mix and match simultaneous +// getdata requests for varying amounts of data items so long as it does not +// exceed the maximum number of simultaneous pending getdata messages or the +// maximum number of total overall pending data item requests. +// +// It must be run in a goroutine. +func (sp *serverPeer) serveGetData() { + // Allow a max number of items to be loaded from the database/mempool and + // queued for send. + const maxPendingSend = 3 + sendDoneChan := make(chan struct{}, maxPendingSend+1) + semaphore := make(chan struct{}, maxPendingSend) + + for { + select { + case <-sp.quit: + return + + case invVects := <-sp.getDataQueue: + sp.handleServeGetData(invVects, sendDoneChan, semaphore) + + // Release the semaphore as queued messages are sent. + case <-sendDoneChan: + <-semaphore + } + } +} + +// Run starts additional async processing for the peer and blocks until the peer +// disconnects at which point it notifies the server and net sync manager that +// the peer has disconnected and performs other associated cleanup such as +// evicting any remaining orphans sent by the peer and shutting down all +// goroutines. +func (sp *serverPeer) Run() { + var wg sync.WaitGroup + wg.Add(1) + go func() { + sp.serveGetData() + wg.Done() + }() + + // Wait for the peer to disconnect and notify the net sync manager and + // server accordingly. + sp.WaitForDisconnect() + srvr := sp.server + srvr.DonePeer(sp) + srvr.syncManager.PeerDisconnected(sp.syncMgrPeer) + + if sp.VersionKnown() { + // Evict any remaining orphans that were sent by the peer. + numEvicted := srvr.txMemPool.RemoveOrphansByTag(mempool.Tag(sp.ID())) + if numEvicted > 0 { + srvrLog.Debugf("Evicted %d %s from peer %v (id %d)", numEvicted, + pickNoun(numEvicted, "orphan", "orphans"), sp, sp.ID()) + } + } + + // Shutdown remaining peer goroutines. + close(sp.quit) + wg.Wait() +} + // newestBlock returns the current best block hash and height using the format // required by the configuration for the peer package. func (sp *serverPeer) newestBlock() (*chainhash.Hash, int64, error) { @@ -1132,8 +1345,8 @@ func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) { sp.server.syncManager.QueueHeaders(msg, sp.syncMgrPeer) } -// handleGetData is invoked when a peer receives a getdata wire message and is -// used to deliver block and transaction information. +// OnGetData is invoked when a peer receives a getdata wire message and is used +// to deliver block and transaction information. func (sp *serverPeer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) { // Ban peers sending empty getdata requests. if len(msg.InvList) == 0 { @@ -1141,74 +1354,47 @@ func (sp *serverPeer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) { return } - numAdded := 0 - notFound := wire.NewMsgNotFound() - - length := len(msg.InvList) // A decaying ban score increase is applied to prevent exhausting resources // with unusually large inventory queries. + // // Requesting more than the maximum inventory vector length within a short - // period of time yields a score above the default ban threshold. Sustained + // period of time yields a score above the default ban threshold. Sustained // bursts of small requests are not penalized as that would potentially ban - // peers performing IBD. + // peers performing the inintial chain sync. + // // This incremental score decays each minute to half of its value. - if sp.addBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata") { + numNewReqs := uint32(len(msg.InvList)) + if sp.addBanScore(0, numNewReqs*99/wire.MaxInvPerMsg, "getdata") { return } - // We wait on this wait channel periodically to prevent queuing - // far more data than we can send in a reasonable time, wasting memory. - // The waiting occurs after the database fetch for the next one to - // provide a little pipelining. - var waitChan chan struct{} - doneChan := make(chan struct{}, 1) - - for i, iv := range msg.InvList { - var c chan struct{} - // If this will be the last message we send. - if i == length-1 && len(notFound.InvList) == 0 { - c = doneChan - } else if (i+1)%3 == 0 { - // Buffered so as to not make the send goroutine block. - c = make(chan struct{}, 1) - } - var err error - switch iv.Type { - case wire.InvTypeTx: - err = sp.server.pushTxMsg(sp, &iv.Hash, c, waitChan) - case wire.InvTypeBlock: - err = sp.server.pushBlockMsg(sp, &iv.Hash, c, waitChan) - default: - peerLog.Warnf("Unknown type '%d' in inventory request from %s", - iv.Type, sp) - continue - } - if err != nil { - notFound.AddInvVect(iv) - - // When there is a failure fetching the final entry - // and the done channel was sent in due to there - // being no outstanding not found inventory, consume - // it here because there is now not found inventory - // that will use the channel momentarily. - if i == len(msg.InvList)-1 && c != nil { - <-c - } - } - numAdded++ - waitChan = c + // Prevent too many outstanding requests while still allowing the + // flexibility to send multiple simultaneous getdata requests that are + // served asynchronously. + numPendingGetDataReqs := len(sp.getDataQueue) + if numPendingGetDataReqs+1 > maxConcurrentGetDataReqs { + peerLog.Debugf("%s exceeded max allowed concurrent pending getdata "+ + "requests (max %d) -- disconnecting", sp, maxConcurrentGetDataReqs) + sp.Disconnect() + return } - if len(notFound.InvList) != 0 { - sp.QueueMessage(notFound, doneChan) + numPendingDataItemReqs := sp.numPendingGetDataItemReqs.Load() + if numPendingDataItemReqs+numNewReqs > maxPendingGetDataItemReqs { + peerLog.Debugf("%s exceeded max allowed pending data item requests "+ + "(new %d, pending %d, max %d) -- disconnecting", sp, numNewReqs, + numPendingDataItemReqs, maxPendingGetDataItemReqs) + sp.Disconnect() + return } - // Wait for messages to be sent. We can send quite a lot of data at this - // point and this will keep the peer busy for a decent amount of time. - // We don't process anything else by them in this time so that we - // have an idea of when we should hear back from them - else the idle - // timeout could fire when we were only half done sending the blocks. - if numAdded > 0 { - <-doneChan + // Queue the data requests to be served asynchronously. Note that this will + // not block due to the use of a buffered channel and the checks above that + // disconnect the peer when the new request would otherwise exceed the + // capacity. + sp.numPendingGetDataItemReqs.Add(numNewReqs) + select { + case <-sp.quit: + case sp.getDataQueue <- msg.InvList: } } @@ -1246,7 +1432,7 @@ func (sp *serverPeer) OnGetBlocks(_ *peer.Peer, msg *wire.MsgGetBlocks) { // would prevent the entire slice from being eligible // for GC as soon as it's sent. continueHash := invMsg.InvList[invListLen-1].Hash - sp.continueHash = &continueHash + sp.continueHash.Store(&continueHash) } sp.QueueMessage(invMsg, nil) } @@ -1597,81 +1783,6 @@ func (s *server) TransactionConfirmed(tx *dcrutil.Tx) { } } -// pushTxMsg sends a tx message for the provided transaction hash to the -// connected peer. An error is returned if the transaction hash is not known. -func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{}, waitChan <-chan struct{}) error { - // Attempt to fetch the requested transaction from the pool. A - // call could be made to check for existence first, but simply trying - // to fetch a missing transaction results in the same behavior. - // Do not allow peers to request transactions already in a block - // but are unconfirmed, as they may be expensive. Restrict that - // to the authenticated RPC only. - tx, err := s.txMemPool.FetchTransaction(hash) - if err != nil { - peerLog.Tracef("Unable to fetch tx %v from transaction "+ - "pool: %v", hash, err) - - if doneChan != nil { - doneChan <- struct{}{} - } - return err - } - - // Once we have fetched data wait for any previous operation to finish. - if waitChan != nil { - <-waitChan - } - - sp.QueueMessage(tx.MsgTx(), doneChan) - - return nil -} - -// pushBlockMsg sends a block message for the provided block hash to the -// connected peer. An error is returned if the block hash is not known. -func (s *server) pushBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{}, waitChan <-chan struct{}) error { - block, err := sp.server.chain.BlockByHash(hash) - if err != nil { - peerLog.Tracef("Unable to fetch requested block hash %v: %v", - hash, err) - - if doneChan != nil { - doneChan <- struct{}{} - } - return err - } - - // Once we have fetched data wait for any previous operation to finish. - if waitChan != nil { - <-waitChan - } - - // We only send the channel for this message if we aren't sending - // an inv straight after. - var dc chan<- struct{} - continueHash := sp.continueHash - sendInv := continueHash != nil && continueHash.IsEqual(hash) - if !sendInv { - dc = doneChan - } - sp.QueueMessage(block.MsgBlock(), dc) - - // When the peer requests the final block that was advertised in - // response to a getblocks message which requested more blocks than - // would fit into a single message, send it a new inventory message - // to trigger it to issue another getblocks message for the next - // batch of inventory. - if sendInv { - best := sp.server.chain.BestSnapshot() - invMsg := wire.NewMsgInvSizeHint(1) - iv := wire.NewInvVect(wire.InvTypeBlock, &best.Hash) - invMsg.AddInvVect(iv) - sp.QueueMessage(invMsg, doneChan) - sp.continueHash = nil - } - return nil -} - // handleAddPeerMsg deals with adding new peers. It is invoked from the // peerHandler goroutine. func (s *server) handleAddPeerMsg(state *peerState, sp *serverPeer) bool { @@ -2225,21 +2336,22 @@ func newPeerConfig(sp *serverPeer) *peer.Config { // inboundPeerConnected is invoked by the connection manager when a new inbound // connection is established. It initializes a new inbound server peer -// instance, associates it with the connection, and starts a goroutine to wait -// for disconnection. +// instance, associates it with the connection, and starts all additional server +// peer processing goroutines. func (s *server) inboundPeerConnected(conn net.Conn) { sp := newServerPeer(s, false) sp.isWhitelisted = isWhitelisted(conn.RemoteAddr()) sp.Peer = peer.NewInboundPeer(newPeerConfig(sp)) sp.syncMgrPeer = netsync.NewPeer(sp.Peer) sp.AssociateConnection(conn) - go s.peerDoneHandler(sp) + go sp.Run() } // outboundPeerConnected is invoked by the connection manager when a new // outbound connection is established. It initializes a new outbound server // peer instance, associates it with the relevant state such as the connection -// request instance and the connection itself. +// request instance and the connection itself, and start all additional server +// peer processing goroutines. func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) { sp := newServerPeer(s, c.Permanent) p, err := peer.NewOutboundPeer(newPeerConfig(sp), c.Addr.String()) @@ -2253,27 +2365,7 @@ func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) { sp.connReq = c sp.isWhitelisted = isWhitelisted(conn.RemoteAddr()) sp.AssociateConnection(conn) - go s.peerDoneHandler(sp) -} - -// peerDoneHandler handles peer disconnects by notifying the server that it's -// done along with other performing other desirable cleanup. -func (s *server) peerDoneHandler(sp *serverPeer) { - sp.WaitForDisconnect() - s.DonePeer(sp) - - // Notify the net sync manager the peer is gone. - s.syncManager.PeerDisconnected(sp.syncMgrPeer) - - if sp.VersionKnown() { - // Evict any remaining orphans that were sent by the peer. - numEvicted := s.txMemPool.RemoveOrphansByTag(mempool.Tag(sp.ID())) - if numEvicted > 0 { - srvrLog.Debugf("Evicted %d %s from peer %v (id %d)", numEvicted, - pickNoun(numEvicted, "orphan", "orphans"), sp, sp.ID()) - } - } - close(sp.quit) + go sp.Run() } // peerHandler is used to handle peer operations such as adding and removing