Skip to content

Commit

Permalink
bitswap/httpnet: more improvements to Connect(), peer management
Browse files Browse the repository at this point in the history
  • Loading branch information
hsanjuan committed Jan 24, 2025
1 parent eddd063 commit a70b2a1
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 87 deletions.
32 changes: 21 additions & 11 deletions bitswap/network/http_multiaddr.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,16 @@ import (
"github.com/multiformats/go-multiaddr"
)

// ParsedURL contains the result of parsing an "http" transport multiaddress.
// SNI is set when the multiaddress specifies an SNI value.
type ParsedURL struct {
URL *url.URL
SNI string
}

// ExtractHTTPAddress extracts the HTTP schema+host+port from a multiaddress
// and returns a *url.URL and an SNI string if present.
func ExtractHTTPAddress(ma multiaddr.Multiaddr) (*url.URL, string, error) {
func ExtractHTTPAddress(ma multiaddr.Multiaddr) (ParsedURL, error) {
components := ma.Protocols()
var host, port, schema, sni string
var tls bool
Expand All @@ -20,13 +27,13 @@ func ExtractHTTPAddress(ma multiaddr.Multiaddr) (*url.URL, string, error) {
case "dns", "dns4", "dns6", "ip4", "ip6":
hostVal, err := ma.ValueForProtocol(comp.Code)
if err != nil {
return nil, "", fmt.Errorf("failed to extract host: %w", err)
return ParsedURL{}, fmt.Errorf("failed to extract host: %w", err)
}
host = hostVal
case "tcp", "udp":
portVal, err := ma.ValueForProtocol(comp.Code)
if err != nil {
return nil, "", fmt.Errorf("failed to extract port: %w", err)
return ParsedURL{}, fmt.Errorf("failed to extract port: %w", err)
}
port = portVal
case "tls":
Expand All @@ -41,37 +48,40 @@ func ExtractHTTPAddress(ma multiaddr.Multiaddr) (*url.URL, string, error) {
case "sni":
sniVal, err := ma.ValueForProtocol(comp.Code)
if err != nil {
return nil, "", fmt.Errorf("failed to extract SNI: %w", err)
return ParsedURL{}, fmt.Errorf("failed to extract SNI: %w", err)
}
sni = sniVal
}
}

if host == "" || port == "" || schema == "" {
return nil, "", fmt.Errorf("multiaddress is missing required components (host/port/schema)")
return ParsedURL{}, fmt.Errorf("multiaddress is missing required components (host/port/schema)")
}

// Construct the URL object
address := fmt.Sprintf("%s://%s:%s", schema, host, port)
parsedURL, err := url.Parse(address)
if err != nil {
return nil, "", fmt.Errorf("failed to parse URL: %w", err)
return ParsedURL{}, fmt.Errorf("failed to parse URL: %w", err)
}

return parsedURL, sni, nil
return ParsedURL{
URL: parsedURL,
SNI: sni,
}, nil
}

// ExtractURLsFromPeer extracts all HTTP schema+host+port addresses as *url.URL from a peer.AddrInfo object.
func ExtractURLsFromPeer(info peer.AddrInfo) []*url.URL {
var addresses []*url.URL
func ExtractURLsFromPeer(info peer.AddrInfo) []ParsedURL {
var addresses []ParsedURL

for _, addr := range info.Addrs {
httpAddress, _, err := ExtractHTTPAddress(addr)
purl, err := ExtractHTTPAddress(addr)
if err != nil {
// Skip invalid or non-HTTP addresses but continue with others
continue
}
addresses = append(addresses, httpAddress)
addresses = append(addresses, purl)
}

return addresses
Expand Down
52 changes: 38 additions & 14 deletions bitswap/network/httpnet/cooldown.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,55 @@
package httpnet

import (
"net/url"
"sync"
"time"

"github.com/ipfs/boxo/bitswap/network"
)

type cooldownTracker struct {
maxBackoff time.Duration

urlsLock sync.RWMutex
urls map[string]time.Time

stop chan struct{}
}

func newCooldownTracker(maxBackoff time.Duration) *cooldownTracker {
return &cooldownTracker{
ct := &cooldownTracker{
maxBackoff: maxBackoff,
urls: make(map[string]time.Time),
stop: make(chan struct{}),
}

go ct.cleaner()
return ct
}

// every minute clean expired cooldowns.
func (ct *cooldownTracker) cleaner() {
tick := time.NewTicker(time.Minute)
for {
select {
case <-ct.stop:
return
case now := <-tick.C:
ct.urlsLock.Lock()
for host, dl := range ct.urls {
if dl.Before(now) {
delete(ct.urls, host)
}
}
ct.urlsLock.Unlock()
}
}
}

func (ct *cooldownTracker) stopCleaner() {
close(ct.stop)
}

func (ct *cooldownTracker) setByDate(host string, t time.Time) {
latestDate := time.Now().Add(ct.maxBackoff)
if t.After(latestDate) {
Expand All @@ -45,27 +75,21 @@ func (ct *cooldownTracker) remove(host string) {
ct.urlsLock.Unlock()
}

func (ct *cooldownTracker) fillSenderURLs(urls []*url.URL) []*senderURL {
func (ct *cooldownTracker) fillSenderURLs(urls []network.ParsedURL) []*senderURL {
now := time.Now()
surls := make([]*senderURL, len(urls))
ct.urlsLock.RLock()
{

for i, u := range urls {
var cooldown time.Time
dl, ok := ct.urls[u.Host]
if ok {
if now.Before(dl) {
cooldown = dl
} else {
// TODO: remove if we add a cleaning
// thread.
delete(ct.urls, u.Host)
}
dl, ok := ct.urls[u.URL.Host]
if ok && now.Before(dl) {
cooldown = dl
}
surls[i] = &senderURL{
url: u,
cooldown: cooldown,
ParsedURL: u,
cooldown: cooldown,
}
}
}
Expand Down
Loading

0 comments on commit a70b2a1

Please sign in to comment.