From a70b2a147684605e0b502be7f73d3e6365d397d3 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Fri, 24 Jan 2025 12:26:27 +0100 Subject: [PATCH] bitswap/httpnet: more improvements to Connect(), peer management --- bitswap/network/http_multiaddr.go | 32 +++-- bitswap/network/httpnet/cooldown.go | 52 +++++--- bitswap/network/httpnet/httpnet.go | 163 +++++++++++++++++--------- bitswap/network/httpnet/msg_sender.go | 5 +- bitswap/network/httpnet/pinger.go | 4 +- bitswap/network/router.go | 40 ++++++- 6 files changed, 209 insertions(+), 87 deletions(-) diff --git a/bitswap/network/http_multiaddr.go b/bitswap/network/http_multiaddr.go index 3f13232fb..1db05599b 100644 --- a/bitswap/network/http_multiaddr.go +++ b/bitswap/network/http_multiaddr.go @@ -8,9 +8,16 @@ import ( "github.com/multiformats/go-multiaddr" ) +// ParsedURL contains the result of parsing an "http" transport multiaddress. +// SNI is set when the multiaddress specifies an SNI value. +type ParsedURL struct { + URL *url.URL + SNI string +} + // ExtractHTTPAddress extracts the HTTP schema+host+port from a multiaddress // and returns a *url.URL and an SNI string if present. -func ExtractHTTPAddress(ma multiaddr.Multiaddr) (*url.URL, string, error) { +func ExtractHTTPAddress(ma multiaddr.Multiaddr) (ParsedURL, error) { components := ma.Protocols() var host, port, schema, sni string var tls bool @@ -20,13 +27,13 @@ func ExtractHTTPAddress(ma multiaddr.Multiaddr) (*url.URL, string, error) { case "dns", "dns4", "dns6", "ip4", "ip6": hostVal, err := ma.ValueForProtocol(comp.Code) if err != nil { - return nil, "", fmt.Errorf("failed to extract host: %w", err) + return ParsedURL{}, fmt.Errorf("failed to extract host: %w", err) } host = hostVal case "tcp", "udp": portVal, err := ma.ValueForProtocol(comp.Code) if err != nil { - return nil, "", fmt.Errorf("failed to extract port: %w", err) + return ParsedURL{}, fmt.Errorf("failed to extract port: %w", err) } port = portVal case "tls": @@ -41,37 +48,40 @@ func ExtractHTTPAddress(ma multiaddr.Multiaddr) (*url.URL, string, error) { case "sni": sniVal, err := ma.ValueForProtocol(comp.Code) if err != nil { - return nil, "", fmt.Errorf("failed to extract SNI: %w", err) + return ParsedURL{}, fmt.Errorf("failed to extract SNI: %w", err) } sni = sniVal } } if host == "" || port == "" || schema == "" { - return nil, "", fmt.Errorf("multiaddress is missing required components (host/port/schema)") + return ParsedURL{}, fmt.Errorf("multiaddress is missing required components (host/port/schema)") } // Construct the URL object address := fmt.Sprintf("%s://%s:%s", schema, host, port) parsedURL, err := url.Parse(address) if err != nil { - return nil, "", fmt.Errorf("failed to parse URL: %w", err) + return ParsedURL{}, fmt.Errorf("failed to parse URL: %w", err) } - return parsedURL, sni, nil + return ParsedURL{ + URL: parsedURL, + SNI: sni, + }, nil } // ExtractURLsFromPeer extracts all HTTP schema+host+port addresses as *url.URL from a peer.AddrInfo object. -func ExtractURLsFromPeer(info peer.AddrInfo) []*url.URL { - var addresses []*url.URL +func ExtractURLsFromPeer(info peer.AddrInfo) []ParsedURL { + var addresses []ParsedURL for _, addr := range info.Addrs { - httpAddress, _, err := ExtractHTTPAddress(addr) + purl, err := ExtractHTTPAddress(addr) if err != nil { // Skip invalid or non-HTTP addresses but continue with others continue } - addresses = append(addresses, httpAddress) + addresses = append(addresses, purl) } return addresses diff --git a/bitswap/network/httpnet/cooldown.go b/bitswap/network/httpnet/cooldown.go index d3f5e8a24..c8a493950 100644 --- a/bitswap/network/httpnet/cooldown.go +++ b/bitswap/network/httpnet/cooldown.go @@ -1,9 +1,10 @@ package httpnet import ( - "net/url" "sync" "time" + + "github.com/ipfs/boxo/bitswap/network" ) type cooldownTracker struct { @@ -11,15 +12,44 @@ type cooldownTracker struct { urlsLock sync.RWMutex urls map[string]time.Time + + stop chan struct{} } func newCooldownTracker(maxBackoff time.Duration) *cooldownTracker { - return &cooldownTracker{ + ct := &cooldownTracker{ maxBackoff: maxBackoff, urls: make(map[string]time.Time), + stop: make(chan struct{}), + } + + go ct.cleaner() + return ct +} + +// every minute clean expired cooldowns. +func (ct *cooldownTracker) cleaner() { + tick := time.NewTicker(time.Minute) + for { + select { + case <-ct.stop: + return + case now := <-tick.C: + ct.urlsLock.Lock() + for host, dl := range ct.urls { + if dl.Before(now) { + delete(ct.urls, host) + } + } + ct.urlsLock.Unlock() + } } } +func (ct *cooldownTracker) stopCleaner() { + close(ct.stop) +} + func (ct *cooldownTracker) setByDate(host string, t time.Time) { latestDate := time.Now().Add(ct.maxBackoff) if t.After(latestDate) { @@ -45,7 +75,7 @@ func (ct *cooldownTracker) remove(host string) { ct.urlsLock.Unlock() } -func (ct *cooldownTracker) fillSenderURLs(urls []*url.URL) []*senderURL { +func (ct *cooldownTracker) fillSenderURLs(urls []network.ParsedURL) []*senderURL { now := time.Now() surls := make([]*senderURL, len(urls)) ct.urlsLock.RLock() @@ -53,19 +83,13 @@ func (ct *cooldownTracker) fillSenderURLs(urls []*url.URL) []*senderURL { for i, u := range urls { var cooldown time.Time - dl, ok := ct.urls[u.Host] - if ok { - if now.Before(dl) { - cooldown = dl - } else { - // TODO: remove if we add a cleaning - // thread. - delete(ct.urls, u.Host) - } + dl, ok := ct.urls[u.URL.Host] + if ok && now.Before(dl) { + cooldown = dl } surls[i] = &senderURL{ - url: u, - cooldown: cooldown, + ParsedURL: u, + cooldown: cooldown, } } } diff --git a/bitswap/network/httpnet/httpnet.go b/bitswap/network/httpnet/httpnet.go index 9dd70101d..b05550101 100644 --- a/bitswap/network/httpnet/httpnet.go +++ b/bitswap/network/httpnet/httpnet.go @@ -7,7 +7,6 @@ import ( "crypto/tls" "errors" "fmt" - "math/rand/v2" "net" "net/http" "net/url" @@ -24,6 +23,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "github.com/multiformats/go-multiaddr" ) var log = logging.Logger("httpnet") @@ -35,14 +35,16 @@ var _ network.BitSwapNetwork = (*Network)(nil) // Defaults for the different options var ( - DefaultMaxBlockSize int64 = 2 << 20 // 2MiB. - DefaultUserAgent = defaultUserAgent() // Usually will result in a "boxo@commitID" - DefaultIdleConnTimeout = 30 * time.Second - DefaultResponseHeaderTimeout = 10 * time.Second - DefaultMaxIdleConns = 50 - DefaultSupportsHave = false - DefaultInsecureSkipVerify = false - DefaultMaxBackoff = time.Minute + DefaultMaxBlockSize int64 = 2 << 20 // 2MiB. + DefaultUserAgent = defaultUserAgent() // Usually will result in a "boxo@commitID" + DefaultDialTimeout = 3 * time.Second + DefaultIdleConnTimeout = 30 * time.Second + DefaultResponseHeaderTimeout = 10 * time.Second + DefaultMaxIdleConns = 50 + DefaultSupportsHave = false + DefaultInsecureSkipVerify = false + DefaultMaxBackoff = time.Minute + DefaultMaxHTTPAddressesPerPeer = 10 ) // Option allows to configure the Network. @@ -62,6 +64,13 @@ func WithMaxBlockSize(size int64) Option { } } +// WithDialTimeout sets the maximum time to wait for a connection to be set up. +func WithDialTimeout(t time.Duration) Option { + return func(net *Network) { + net.dialTimeout = t + } +} + // WithIdleConnTimeout sets how long to keep connections alive before closing // them when no requests happen. func WithIdleConnTimeout(t time.Duration) Option { @@ -115,6 +124,14 @@ func WithAllowlist(hosts []string) Option { } } +// WithMaxHTTPAddressesPerPeer limits how many http addresses we attempt to +// connect to per peer. +func WithMaxHTTPAddressesPerPeer(max int) Option { + return func(net *Network) { + net.maxHTTPAddressesPerPeer = max + } +} + type Network struct { // NOTE: Stats must be at the top of the heap allocation to ensure 64bit // alignment. @@ -130,14 +147,16 @@ type Network struct { cooldownTracker *cooldownTracker // options - userAgent string - maxBlockSize int64 - idleConnTimeout time.Duration - responseHeaderTimeout time.Duration - maxIdleConns int - supportsHave bool - insecureSkipVerify bool - allowlist map[string]struct{} + userAgent string + maxBlockSize int64 + dialTimeout time.Duration + idleConnTimeout time.Duration + responseHeaderTimeout time.Duration + maxIdleConns int + supportsHave bool + insecureSkipVerify bool + maxHTTPAddressesPerPeer int + allowlist map[string]struct{} metrics *metrics } @@ -145,16 +164,18 @@ type Network struct { // New returns a BitSwapNetwork supported by underlying IPFS host. func New(host host.Host, opts ...Option) network.BitSwapNetwork { htnet := &Network{ - host: host, - pinger: newPinger(host), - userAgent: defaultUserAgent(), - maxBlockSize: DefaultMaxBlockSize, - idleConnTimeout: DefaultIdleConnTimeout, - responseHeaderTimeout: DefaultResponseHeaderTimeout, - maxIdleConns: DefaultMaxIdleConns, - supportsHave: DefaultSupportsHave, - insecureSkipVerify: DefaultInsecureSkipVerify, - metrics: newMetrics(), + host: host, + pinger: newPinger(host), + userAgent: defaultUserAgent(), + maxBlockSize: DefaultMaxBlockSize, + dialTimeout: DefaultDialTimeout, + idleConnTimeout: DefaultIdleConnTimeout, + responseHeaderTimeout: DefaultResponseHeaderTimeout, + maxIdleConns: DefaultMaxIdleConns, + supportsHave: DefaultSupportsHave, + insecureSkipVerify: DefaultInsecureSkipVerify, + maxHTTPAddressesPerPeer: DefaultMaxHTTPAddressesPerPeer, + metrics: newMetrics(), } for _, opt := range opts { @@ -169,7 +190,7 @@ func New(host host.Host, opts ...Option) network.BitSwapNetwork { netdialer := &net.Dialer{ // Timeout for connects to complete. - Timeout: 5 * time.Second, + Timeout: htnet.dialTimeout, // KeepAlive config for sending probes for an active // connection. KeepAliveConfig: net.KeepAliveConfig{ @@ -239,6 +260,7 @@ func (ht *Network) Start(receivers ...network.Receiver) { // Other methods should no longer be used after calling Stop(). func (ht *Network) Stop() { ht.connEvtMgr.Stop() + ht.cooldownTracker.stopCleaner() } // Ping triggers a ping to the given peer and returns the latency. @@ -300,32 +322,39 @@ func (ht *Network) Connect(ctx context.Context, p peer.AddrInfo) error { return ErrNoHTTPAddresses } + // avoid funny things like someone adding 100 broken urls to a peer. + htaddrs.Addrs = htaddrs.Addrs[0:ht.maxHTTPAddressesPerPeer] + urls := network.ExtractURLsFromPeer(htaddrs) if len(ht.allowlist) > 0 { - var filteredURLs []*url.URL - for _, u := range urls { - host, _, err := net.SplitHostPort(u.Host) + var filteredURLs []network.ParsedURL + var filteredAddrs []multiaddr.Multiaddr + for i, u := range urls { + host, _, err := net.SplitHostPort(u.URL.Host) if err != nil { return err } if _, ok := ht.allowlist[host]; ok { filteredURLs = append(filteredURLs, u) + filteredAddrs = append(filteredAddrs, htaddrs.Addrs[i]) } } urls = filteredURLs + htaddrs.Addrs = filteredAddrs } - // if filteredURLs == 0 nothing will happen below and we will return - // an error. + // if len(filteredURLs == 0) nothing will happen below and we will return + // an error below. - rand.Shuffle(len(urls), func(i, j int) { - urls[i], urls[j] = urls[j], urls[i] - }) - - // We will know try to talk to this peer by making an HTTP request. - // This allows re-using the connection that we are about to open next - // time with the client. The dialer callbacks will call peer.Connected() + // We will know try to talk to this peer by making HTTP requests to its urls + // and recording which ones work. + // This allows re-using the connections that we are about to open next + // time with the client. We call peer.Connected() // on success. - for _, u := range urls { + // + // TODO: Decide whether we want to connect to all, or just try until + // we find a working one. + var workingAddrs []multiaddr.Multiaddr + for i, u := range urls { req, err := ht.buildRequest(ctx, p.ID, u, "GET", "bafyaabakaieac") if err != nil { log.Debug(err) @@ -344,17 +373,21 @@ func (ht *Network) Connect(ctx context.Context, p peer.AddrInfo) error { } if resp.StatusCode >= 500 { // 5xx // We made a proper request and got a 5xx back. - // We cannot consider this a successful connection. + // We cannot consider this a working connection. continue } - ht.host.Peerstore().AddAddrs(p.ID, htaddrs.Addrs, peerstore.PermanentAddrTTL) + workingAddrs = append(workingAddrs, htaddrs.Addrs[i]) + } + + if len(workingAddrs) > 0 { + ht.host.Peerstore().AddAddrs(p.ID, workingAddrs, peerstore.PermanentAddrTTL) ht.connEvtMgr.Connected(p.ID) ht.pinger.startPinging(p.ID) + // We "connected" return nil - // otherwise keep trying other urls. We don't care about the - // http status code as long as the request succeeded. } + err := fmt.Errorf("%w: %s", ErrNoSuccess, p.ID) log.Debug(err) return err @@ -365,24 +398,45 @@ func (ht *Network) Connect(ctx context.Context, p peer.AddrInfo) error { // peerstore. func (ht *Network) DisconnectFrom(ctx context.Context, p peer.ID) error { // this kills all ongoing requests which is more or less equivalent. - ht.connEvtMgr.Disconnected(p) - ht.pinger.stopPinging(p) + pi := ht.host.Peerstore().PeerInfo(p) + _, bsaddrs := network.SplitHTTPAddrs(pi) ht.host.Peerstore().ClearAddrs(p) + if len(bsaddrs.Addrs) == 0 { + ht.connEvtMgr.Disconnected(p) + } else { // re-add bitswap addresses + // unfortunately we cannot maintain ttl info + ht.host.Peerstore().SetAddrs(p, bsaddrs.Addrs, peerstore.TempAddrTTL) + } + ht.pinger.stopPinging(p) + + // coolDownTracker: we leave untouched. We want to keep + // ongoing cooldowns there in case we reconnect to this peer. + return nil } -// ** We have no way of protecting a connection from our side other than using -// it so that it does not idle and gets closed. - +// TagPeer uses the host's ConnManager to tag a peer. func (ht *Network) TagPeer(p peer.ID, tag string, w int) { + ht.host.ConnManager().TagPeer(p, tag, w) } + +// UntagPeer uses the host's ConnManager to untag a peer. func (ht *Network) UntagPeer(p peer.ID, tag string) { + ht.host.ConnManager().UntagPeer(p, tag) } +// Protect does nothing. The purpose of Protect is to mantain connections as +// long as they are used. But our connections are already maintained as long +// as they are, and closed when not. func (ht *Network) Protect(p peer.ID, tag string) { } + +// Unprotect does nothing. The purpose of Unprotect is to be able to close +// connections when they are no longer relevant. Our connections are already +// closed when they are not used. It returns always true as technically our +// connections are potentially still protected as long as they are used. func (ht *Network) Unprotect(p peer.ID, tag string) bool { - return false + return true } // Stats returns message counts for this peer. Each message sent is an HTTP @@ -395,9 +449,9 @@ func (ht *Network) Stats() network.Stats { } // buildRequests sets up common settings for making a requests. -func (ht *Network) buildRequest(ctx context.Context, pid peer.ID, u *url.URL, method string, cid string) (*http.Request, error) { +func (ht *Network) buildRequest(ctx context.Context, pid peer.ID, u network.ParsedURL, method string, cid string) (*http.Request, error) { // copy url - sendURL, _ := url.Parse(u.String()) + sendURL, _ := url.Parse(u.URL.String()) sendURL.RawQuery = "format=raw" sendURL.Path += "/ipfs/" + cid @@ -414,6 +468,9 @@ func (ht *Network) buildRequest(ctx context.Context, pid peer.ID, u *url.URL, me headers := make(http.Header) headers.Add("Accept", "application/vnd.ipld.raw") headers.Add("User-Agent", ht.userAgent) + if u.SNI != "" { + headers.Add("Host", u.SNI) + } req.Header = headers return req, nil } diff --git a/bitswap/network/httpnet/msg_sender.go b/bitswap/network/httpnet/msg_sender.go index a14614fbb..a7e2816b3 100644 --- a/bitswap/network/httpnet/msg_sender.go +++ b/bitswap/network/httpnet/msg_sender.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "net/http" - "net/url" "slices" "strconv" "sync" @@ -61,7 +60,7 @@ func setSenderOpts(opts *network.MessageSenderOpts) network.MessageSenderOpts { // senderURL wraps url with information about cooldowns and errors. type senderURL struct { - url *url.URL + network.ParsedURL cooldown time.Time clientErrors int serverErrors int @@ -184,7 +183,7 @@ func (sender *httpMsgSender) tryURL(ctx context.Context, u *senderURL, entry bsm panic("unknown bitswap entry type") } - req, err := sender.ht.buildRequest(ctx, sender.peer, u.url, method, entry.Cid.String()) + req, err := sender.ht.buildRequest(ctx, sender.peer, u.ParsedURL, method, entry.Cid.String()) if err != nil { return &senderError{ Type: typeFatal, diff --git a/bitswap/network/httpnet/pinger.go b/bitswap/network/httpnet/pinger.go index 604c14f65..969fdd578 100644 --- a/bitswap/network/httpnet/pinger.go +++ b/bitswap/network/httpnet/pinger.go @@ -51,7 +51,7 @@ func (pngr *pinger) ping(ctx context.Context, p peer.ID) ping.Result { for _, u := range urls { go func(u *url.URL) { // Remove port from url. - host, _, err := net.SplitHostPort(urls[0].Host) + host, _, err := net.SplitHostPort(u.Host) if err != nil { results <- ping.Result{ Error: err, @@ -78,7 +78,7 @@ func (pngr *pinger) ping(ctx context.Context, p peer.ID) ping.Result { results <- ping.Result{ RTT: pinger.Statistics().AvgRtt, } - }(u) + }(u.URL) } var result ping.Result diff --git a/bitswap/network/router.go b/bitswap/network/router.go index 4f95bee45..ede96a3ea 100644 --- a/bitswap/network/router.go +++ b/bitswap/network/router.go @@ -118,19 +118,51 @@ func (rt *router) NewMessageSender(ctx context.Context, p peer.ID, opts *Message } func (rt *router) TagPeer(p peer.ID, tag string, w int) { - rt.HTTP.TagPeer(p, tag, w) + // tag once only if they are the same. + if rt.HTTP.Self() == rt.Bitswap.Self() { + rt.HTTP.TagPeer(p, tag, w) + return + } + + pi := rt.Peerstore.PeerInfo(p) + htaddrs, _ := SplitHTTPAddrs(pi) + if len(htaddrs.Addrs) > 0 { + rt.HTTP.TagPeer(p, tag, w) + return + } rt.Bitswap.TagPeer(p, tag, w) } func (rt *router) UntagPeer(p peer.ID, tag string) { - rt.HTTP.UntagPeer(p, tag) + // tag once only if they are the same. + if rt.HTTP.Self() == rt.Bitswap.Self() { + rt.HTTP.UntagPeer(p, tag) + return + } + + pi := rt.Peerstore.PeerInfo(p) + htaddrs, _ := SplitHTTPAddrs(pi) + if len(htaddrs.Addrs) > 0 { + rt.HTTP.UntagPeer(p, tag) + return + } rt.Bitswap.UntagPeer(p, tag) } func (rt *router) Protect(p peer.ID, tag string) { - rt.HTTP.Protect(p, tag) + pi := rt.Peerstore.PeerInfo(p) + htaddrs, _ := SplitHTTPAddrs(pi) + if len(htaddrs.Addrs) > 0 { + rt.HTTP.Protect(p, tag) + return + } rt.Bitswap.Protect(p, tag) } func (rt *router) Unprotect(p peer.ID, tag string) bool { - return rt.HTTP.Unprotect(p, tag) || rt.Bitswap.Unprotect(p, tag) + pi := rt.Peerstore.PeerInfo(p) + htaddrs, _ := SplitHTTPAddrs(pi) + if len(htaddrs.Addrs) > 0 { + return rt.HTTP.Unprotect(p, tag) + } + return rt.Bitswap.Unprotect(p, tag) }