Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(libp2p): prioritize dialling peers that have been recently connected #2030

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions packages/libp2p/src/connection-manager/auto-dial.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { logger } from '@libp2p/logger'
import { PeerMap, PeerSet } from '@libp2p/peer-collections'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { PeerJobQueue } from '../utils/peer-job-queue.js'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE, AUTO_DIAL_INTERVAL, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PEER_RETRY_THRESHOLD, AUTO_DIAL_PRIORITY, LAST_DIAL_FAILURE_KEY, MIN_CONNECTIONS } from './constants.js'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE, AUTO_DIAL_INTERVAL, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PEER_RETRY_THRESHOLD, AUTO_DIAL_PRIORITY, LAST_CONNECTED_TIMESTAMP, LAST_DIAL_FAILURE_KEY, MIN_CONNECTIONS } from './constants.js'
import type { Libp2pEvents } from '@libp2p/interface'
import type { EventEmitter } from '@libp2p/interface/events'
import type { PeerStore } from '@libp2p/interface/peer-store'
Expand Down Expand Up @@ -249,9 +249,36 @@ export class AutoDial implements Startable {
return Date.now() - lastDialFailureTimestamp > this.autoDialPeerRetryThresholdMs
})

log('selected %d/%d peers to dial', peersThatHaveNotFailed.length, peers.length)
// sort peers that have been connected recently to the front of the queue
const peersToDial = peersThatHaveNotFailed.sort((a, b) => {
let lastDialTimestampA = 0
let lastDialTimestampB = 0

for (const peer of peersThatHaveNotFailed) {
const peerAMetadata = a.metadata.get(LAST_CONNECTED_TIMESTAMP)
const peerBMetadata = b.metadata.get(LAST_CONNECTED_TIMESTAMP)

if (peerAMetadata !== undefined) {
lastDialTimestampA = parseInt(uint8ArrayToString(peerAMetadata))
}

if (peerBMetadata !== undefined) {
lastDialTimestampB = parseInt(uint8ArrayToString(peerBMetadata))
}

if (lastDialTimestampA > lastDialTimestampB) {
return -1
}

if (lastDialTimestampB > lastDialTimestampA) {
return 1
}

return 0
})

log('selected %d/%d peers to dial', peersToDial.length, peers.length)

for (const peer of peersToDial) {
this.queue.add(async () => {
const numConnections = this.connectionManager.getConnectionsMap().size

Expand Down
2 changes: 2 additions & 0 deletions packages/libp2p/src/connection-manager/constants.defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,5 @@ export const MAX_INCOMING_PENDING_CONNECTIONS = 10
* failed to dial.
*/
export const LAST_DIAL_FAILURE_KEY = 'last-dial-failure'

export const LAST_CONNECTED_TIMESTAMP = 'last-connected-timestamp'
14 changes: 13 additions & 1 deletion packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import { defaultAddressSort } from '@libp2p/utils/address-sort'
import { type Multiaddr, type Resolver, multiaddr } from '@multiformats/multiaddr'
import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers'
import { RateLimiterMemory } from 'rate-limiter-flexible'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { codes } from '../errors.js'
import { getPeerAddress } from '../get-peer.js'
import { AutoDial } from './auto-dial.js'
import { ConnectionPruner } from './connection-pruner.js'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PARALLEL_DIALS_PER_PEER, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, LAST_CONNECTED_TIMESTAMP, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PARALLEL_DIALS_PER_PEER, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js'
import { DialQueue } from './dial-queue.js'
import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions } from '@libp2p/interface'
import type { Connection, MultiaddrConnection } from '@libp2p/interface/connection'
Expand Down Expand Up @@ -540,6 +541,17 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
peerConnections.push(connection)
}

// Set connected peers as recently dialled
try {
await this.peerStore.patch(connection.remotePeer, {
metadata: {
[LAST_CONNECTED_TIMESTAMP]: uint8ArrayFromString(Date.now().toString())
}
})
} catch (err: any) {
log.error('could not update last connected timestamp for peer %p', connection.remotePeer, err)
}

return connection
}

Expand Down
53 changes: 52 additions & 1 deletion packages/libp2p/test/connection-manager/auto-dial.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import Sinon from 'sinon'
import { stubInterface } from 'sinon-ts'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { AutoDial } from '../../src/connection-manager/auto-dial.js'
import { LAST_DIAL_FAILURE_KEY } from '../../src/connection-manager/constants.js'
import { LAST_CONNECTED_TIMESTAMP, LAST_DIAL_FAILURE_KEY } from '../../src/connection-manager/constants.js'
import { matchPeerId } from '../fixtures/match-peer-id.js'
import type { Libp2pEvents } from '@libp2p/interface'
import type { Connection } from '@libp2p/interface/connection'
Expand Down Expand Up @@ -291,4 +291,55 @@ describe('auto-dial', () => {
// should have retried the unreachable peer
expect(connectionManager.openConnection.calledWith(matchPeerId(undialablePeer.id))).to.be.true()
})

it('should prioritize peers which have been successfully connected', async () => {
const recentlyConnectedPeer: Peer = {
id: await createEd25519PeerId(),
protocols: [],
addresses: [{
multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4001'),
isCertified: true
}],
metadata: new Map([[LAST_CONNECTED_TIMESTAMP, uint8ArrayFromString(`${Date.now()}`)]]),
tags: new Map()
}

const neverConnectedPeer: Peer = {
id: await createEd25519PeerId(),
protocols: [],
addresses: [{
multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4002'),
isCertified: true
}],
metadata: new Map(),
tags: new Map()
}

await peerStore.save(neverConnectedPeer.id, neverConnectedPeer)
await peerStore.save(recentlyConnectedPeer.id, recentlyConnectedPeer)

const connectionManager = stubInterface<ConnectionManager>({
getConnectionsMap: new PeerMap(),
getDialQueue: []
})

autoDialler = new AutoDial({
peerStore,
connectionManager,
events
}, {
minConnections: 10
})

autoDialler.start()

void autoDialler.autoDial()

await pWaitFor(() => {
return connectionManager.openConnection.callCount === 2
})

expect(connectionManager.openConnection.getCall(0).args[0].toString()).to.equal(recentlyConnectedPeer.id.toString())
expect(connectionManager.openConnection.getCall(1).args[0].toString()).to.equal(neverConnectedPeer.id.toString())
})
})
22 changes: 20 additions & 2 deletions packages/libp2p/test/connection-manager/direct.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { pEvent } from 'p-event'
import sinon from 'sinon'
import { stubInterface } from 'sinon-ts'
import { defaultComponents, type Components } from '../../src/components.js'
import { LAST_DIAL_FAILURE_KEY } from '../../src/connection-manager/constants.js'
import { LAST_CONNECTED_TIMESTAMP, LAST_DIAL_FAILURE_KEY } from '../../src/connection-manager/constants.js'
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
import { codes as ErrorCodes } from '../../src/errors.js'
import { type IdentifyService, identifyService } from '../../src/identify/index.js'
Expand Down Expand Up @@ -117,6 +117,23 @@ describe('dialing (direct, WebSockets)', () => {
expect(peer.metadata.has(LAST_DIAL_FAILURE_KEY)).to.be.true()
})

it('should mark a peer as having recently connected', async () => {
connectionManager = new DefaultConnectionManager(localComponents)
await connectionManager.start()

const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '')
await localComponents.peerStore.patch(remotePeerId, {
multiaddrs: [remoteAddr]
})

const connection = await connectionManager.openConnection(remotePeerId)
expect(connection).to.exist()

const peer = await localComponents.peerStore.get(remoteComponents.peerId)

expect(peer.metadata.has(LAST_CONNECTED_TIMESTAMP)).to.be.true()
})

it('should be able to connect to a given peer', async () => {
connectionManager = new DefaultConnectionManager(localComponents)
await connectionManager.start()
Expand Down Expand Up @@ -412,7 +429,8 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
expect(identifySpy.callCount).to.equal(1)
await identifySpy.firstCall.returnValue

expect(peerStorePatchSpy.callCount).to.equal(1)
// Account for the peer store being patched by recently dialed peer
expect(peerStorePatchSpy.callCount).to.equal(2)

await libp2p.stop()
})
Expand Down
Loading