Skip to content

Commit

Permalink
network: Allow short-lived connections to query /status endpoint when…
Browse files Browse the repository at this point in the history
… at full capacity (#6009)

Co-authored-by: ohill <[email protected]>
  • Loading branch information
hsoerensen and ohill authored Jun 11, 2024
1 parent 88b0ca5 commit 985512b
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 14 deletions.
2 changes: 1 addition & 1 deletion config/localTemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ type Local struct {
LogArchiveDir string `version[31]:""`

// IncomingConnectionsLimit specifies the max number of incoming connections
// for the port configured in NetAddress. 0 means no connections allowed. Must be non-negative.
// for the gossip protocol configured in NetAddress. 0 means no connections allowed. Must be non-negative.
// Estimating 1.5MB per incoming connection, 1.5MB*2400 = 3.6GB
IncomingConnectionsLimit int `version[0]:"-1" version[1]:"10000" version[17]:"800" version[27]:"2400"`

Expand Down
3 changes: 2 additions & 1 deletion daemon/algod/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/logging/telemetryspec"
"github.com/algorand/go-algorand/network"
"github.com/algorand/go-algorand/network/limitlistener"
"github.com/algorand/go-algorand/node"
"github.com/algorand/go-algorand/util"
Expand Down Expand Up @@ -146,7 +147,7 @@ func (s *Server) Initialize(cfg config.Local, phonebookAddresses []string, genes

if cfg.IsGossipServer() {
var ot basics.OverflowTracker
fdRequired = ot.Add(fdRequired, uint64(cfg.IncomingConnectionsLimit))
fdRequired = ot.Add(fdRequired, uint64(cfg.IncomingConnectionsLimit)+network.ReservedHealthServiceConnections)
if ot.Overflowed {
return errors.New("Initialize() overflowed when adding up IncomingConnectionsLimit to the existing RLIMIT_NOFILE value; decrease RestConnectionsHardLimit or IncomingConnectionsLimit")
}
Expand Down
17 changes: 15 additions & 2 deletions network/requestTracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func (rt *RequestTracker) sendBlockedConnectionResponse(conn net.Conn, requestTi
}
}

// pruneAcceptedConnections clean stale items form the acceptedConnections map; it's syncornized via the acceptedConnectionsMu mutex which is expected to be taken by the caller.
// pruneAcceptedConnections clean stale items form the acceptedConnections map; it's syncornized via the hostRequestsMu mutex which is expected to be taken by the caller.
// in case the created is 0, the pruning is disabled for this connection. The HTTP handlers would call Close to have this entry cleared out.
func (rt *RequestTracker) pruneAcceptedConnections(pruneStartDate time.Time) {
localAddrToRemove := []net.Addr{}
Expand Down Expand Up @@ -494,6 +494,20 @@ func (rt *RequestTracker) ServeHTTP(response http.ResponseWriter, request *http.
localAddr := request.Context().Value(http.LocalAddrContextKey).(net.Addr)

rt.hostRequestsMu.Lock()
// Check if the number of connections exceeds the limit
acceptedConnections := len(rt.acceptedConnections)

if acceptedConnections > rt.config.IncomingConnectionsLimit && request.URL.Path != HealthServiceStatusPath {
rt.hostRequestsMu.Unlock()
// If the limit is exceeded, reject the connection
networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "rt_incoming_connection_limit"})
rt.log.EventWithDetails(telemetryspec.Network, telemetryspec.ConnectPeerFailEvent,
telemetryspec.ConnectPeerFailEventDetails{
Address: localAddr.String(), Incoming: true, Reason: "RequestTracker Connection Limit"})
response.WriteHeader(http.StatusServiceUnavailable)
return
}

trackedRequest := rt.acceptedConnections[localAddr]
if trackedRequest != nil {
// update the original tracker request so that it won't get pruned.
Expand Down Expand Up @@ -550,7 +564,6 @@ func (rt *RequestTracker) ServeHTTP(response http.ResponseWriter, request *http.

// send the request downstream; in our case, it would go to the router.
rt.downstreamHandler.ServeHTTP(response, request)

}

// remoteHostProxyFix updates the origin IP address in the trackedRequest
Expand Down
10 changes: 9 additions & 1 deletion network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ const testingPublicAddress = "testing"
// Maximum number of bytes to read from a header when trying to establish a websocket connection.
const wsMaxHeaderBytes = 4096

// ReservedHealthServiceConnections reserves additional connections for the health check endpoint. This reserves
// capacity to query the health check service when a node is serving maximum peers. The file descriptors will be
// used from the ReservedFDs pool, as this pool is meant for short-lived usage (dns queries, disk i/o, etc.)
const ReservedHealthServiceConnections = 10

var networkIncomingConnections = metrics.MakeGauge(metrics.NetworkIncomingConnections)
var networkOutgoingConnections = metrics.MakeGauge(metrics.NetworkOutgoingConnections)

Expand Down Expand Up @@ -151,6 +156,9 @@ const peerShutdownDisconnectionAckDuration = 50 * time.Millisecond
// Contains {genesisID} param to be handled by gorilla/mux
const GossipNetworkPath = "/v1/{genesisID}/gossip"

// HealthServiceStatusPath is the path to register HealthService as a handler for when using gorilla/mux
const HealthServiceStatusPath = "/status"

// NodeInfo helps the network get information about the node it is running on
type NodeInfo interface {
// IsParticipating returns true if this node has stake and may vote on blocks or propose blocks.
Expand Down Expand Up @@ -684,7 +692,7 @@ func (wn *WebsocketNetwork) Start() {
}
// wrap the original listener with a limited connection listener
listener = limitlistener.RejectingLimitListener(
listener, uint64(wn.config.IncomingConnectionsLimit), wn.log)
listener, uint64(wn.config.IncomingConnectionsLimit)+ReservedHealthServiceConnections, wn.log)
// wrap the limited connection listener with a requests tracker listener
wn.listener = wn.requestsTracker.Listener(listener)
wn.log.Debugf("listening on %s", wn.listener.Addr().String())
Expand Down
8 changes: 3 additions & 5 deletions rpcs/healthService.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@
package rpcs

import (
"github.com/algorand/go-algorand/network"
"net/http"
)

// HealthServiceStatusPath is the path to register HealthService as a handler for when using gorilla/mux
const HealthServiceStatusPath = "/status"
"github.com/algorand/go-algorand/network"
)

// HealthService is a service that provides health information endpoints for the node
type HealthService struct{}
Expand All @@ -31,7 +29,7 @@ type HealthService struct{}
func MakeHealthService(net network.GossipNode) HealthService {
service := HealthService{}

net.RegisterHTTPHandler(HealthServiceStatusPath, service)
net.RegisterHTTPHandler(network.HealthServiceStatusPath, service)

return service
}
Expand Down
9 changes: 5 additions & 4 deletions rpcs/healthService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
package rpcs

import (
"github.com/algorand/go-algorand/network"
"github.com/algorand/go-algorand/test/partitiontest"
"github.com/stretchr/testify/require"
"io"
"net/http"
"path"
"testing"

"github.com/algorand/go-algorand/network"
"github.com/algorand/go-algorand/test/partitiontest"
"github.com/stretchr/testify/require"
)

func TestHealthService_ServeHTTP(t *testing.T) {
Expand All @@ -40,7 +41,7 @@ func TestHealthService_ServeHTTP(t *testing.T) {

client := http.Client{}

parsedURL.Path = path.Join(parsedURL.Path, HealthServiceStatusPath)
parsedURL.Path = path.Join(parsedURL.Path, network.HealthServiceStatusPath)

response, err := client.Get(parsedURL.String())
require.NoError(t, err)
Expand Down

0 comments on commit 985512b

Please sign in to comment.