Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into feat/track-dial-fai…
Browse files Browse the repository at this point in the history
…lures-per-address
  • Loading branch information
achingbrain committed Nov 3, 2023
2 parents 3296f11 + 025c082 commit 308097f
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 46 deletions.
2 changes: 1 addition & 1 deletion packages/interface-compliance-tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
"merge-options": "^3.0.4",
"p-defer": "^4.0.0",
"p-event": "^6.0.0",
"p-limit": "^4.0.0",
"p-limit": "^5.0.0",
"p-wait-for": "^5.0.2",
"protons-runtime": "^5.0.0",
"sinon": "^17.0.0",
Expand Down
24 changes: 3 additions & 21 deletions packages/libp2p/src/circuit-relay/transport/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ import { CodeError } from '@libp2p/interface/errors'
import { TypedEventEmitter } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import { PeerMap } from '@libp2p/peer-collections'
import { peerIdFromString } from '@libp2p/peer-id'
import { multiaddr } from '@multiformats/multiaddr'
import type { ReservationStore } from './reservation-store.js'
import type { Connection } from '@libp2p/interface/connection'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { Listener, ListenerEvents } from '@libp2p/interface/transport'
import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager'
Expand Down Expand Up @@ -41,25 +39,9 @@ class CircuitRelayTransportListener extends TypedEventEmitter<ListenerEvents> im
async listen (addr: Multiaddr): Promise<void> {
log('listen on %a', addr)

const relayPeerStr = addr.getPeerId()
let relayConn: Connection | undefined

// check if we already have a connection to the relay
if (relayPeerStr != null) {
const relayPeer = peerIdFromString(relayPeerStr)
const connections = this.connectionManager.getConnectionsMap().get(relayPeer) ?? []

if (connections.length > 0) {
relayConn = connections[0]
}
}

// open a new connection as we don't already have one
if (relayConn == null) {
const addrString = addr.toString().split('/p2p-circuit').find(a => a !== '')
const ma = multiaddr(addrString)
relayConn = await this.connectionManager.openConnection(ma)
}
// remove the circuit part to get the peer id of the relay
const relayAddr = addr.decapsulate('/p2p-circuit')
const relayConn = await this.connectionManager.openConnection(relayAddr)

if (!this.relayStore.hasReservation(relayConn.remotePeer)) {
// addRelay calls transportManager.listen which calls this listen method
Expand Down
45 changes: 44 additions & 1 deletion packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { AbortError, CodeError } from '@libp2p/interface/errors'
import { setMaxListeners } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import { PeerMap } from '@libp2p/peer-collections'
import { defaultAddressSort } from '@libp2p/utils/address-sort'
import { type Multiaddr, type Resolver, resolvers } from '@multiformats/multiaddr'
import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers'
Expand Down Expand Up @@ -35,6 +36,7 @@ export interface PendingDialTarget {

export interface DialOptions extends AbortOptions {
priority?: number
force?: boolean
}

interface PendingDialInternal extends PendingDial {
Expand All @@ -48,6 +50,7 @@ interface DialerInit {
maxParallelDialsPerPeer?: number
dialTimeout?: number
resolvers?: Record<string, Resolver>
connections?: PeerMap<Connection[]>
}

const defaultOptions = {
Expand Down Expand Up @@ -83,12 +86,14 @@ export class DialQueue {
private readonly inProgressDialCount?: Metric
private readonly pendingDialCount?: Metric
private readonly shutDownController: AbortController
private readonly connections: PeerMap<Connection[]>

constructor (components: DialQueueComponents, init: DialerInit = {}) {
this.addressSorter = init.addressSorter ?? defaultOptions.addressSorter
this.maxPeerAddrsToDial = init.maxPeerAddrsToDial ?? defaultOptions.maxPeerAddrsToDial
this.maxParallelDialsPerPeer = init.maxParallelDialsPerPeer ?? defaultOptions.maxParallelDialsPerPeer
this.dialTimeout = init.dialTimeout ?? defaultOptions.dialTimeout
this.connections = init.connections ?? new PeerMap()

this.peerId = components.peerId
this.peerStore = components.peerStore
Expand Down Expand Up @@ -186,6 +191,23 @@ export class DialQueue {
throw err
}

// make sure we don't have an existing connection to any of the addresses we
// are about to dial
let existingConnection = Array.from(this.connections.values()).flat().find(conn => {
if (options.force === true) {
return false
}

return addrsToDial.find(addr => {
return addr.multiaddr.equals(conn.remoteAddr)
})
})

if (existingConnection != null) {
log('already connected to %a', existingConnection.remoteAddr)
return existingConnection
}

// ready to dial, all async work finished - make sure we don't have any
// pending dials in progress for this peer or set of multiaddrs
const existingDial = this.pendingDials.find(dial => {
Expand Down Expand Up @@ -256,7 +278,28 @@ export class DialQueue {
// let other dials join this one
this.pendingDials.push(pendingDial)

return pendingDial.promise
const connection = await pendingDial.promise

// we may have been dialing a multiaddr without a peer id attached but by
// this point we have upgraded the connection so the remote peer information
// should be available - check again that we don't already have a connection
// to the remote multiaddr
existingConnection = Array.from(this.connections.values()).flat().find(conn => {
if (options.force === true) {
return false
}

return conn.id !== connection.id && conn.remoteAddr.equals(connection.remoteAddr)
})

if (existingConnection != null) {
log('already connected to %a', existingConnection.remoteAddr)
await connection.close()
return existingConnection
}

log('connection opened to %a', connection.remoteAddr)
return connection
}

private createDialAbortControllers (userSignal?: AbortSignal): ClearableSignal {
Expand Down
12 changes: 7 additions & 5 deletions packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
dialTimeout: init.dialTimeout ?? DIAL_TIMEOUT,
resolvers: init.resolvers ?? {
dnsaddr: dnsaddrResolver
}
},
connections: this.connections
})
}

Expand Down Expand Up @@ -505,12 +506,13 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {

if (peerId != null && options.force !== true) {
log('dial %p', peerId)
const existingConnections = this.getConnections(peerId)
const existingConnection = this.getConnections(peerId)
.find(conn => !conn.transient)

if (existingConnections.length > 0) {
log('had an existing connection to %p', peerId)
if (existingConnection != null) {
log('had an existing non-transient connection to %p', peerId)

return existingConnections[0]
return existingConnection
}
}

Expand Down
57 changes: 40 additions & 17 deletions packages/libp2p/src/identify/identify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import type { Libp2pEvents, IdentifyResult, SignedPeerRecord, AbortOptions } fro
import type { Connection, Stream } from '@libp2p/interface/connection'
import type { TypedEventTarget } from '@libp2p/interface/events'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { Peer, PeerStore } from '@libp2p/interface/peer-store'
import type { Peer, PeerData, PeerStore } from '@libp2p/interface/peer-store'
import type { Startable } from '@libp2p/interface/startable'
import type { AddressManager } from '@libp2p/interface-internal/address-manager'
import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager'
Expand Down Expand Up @@ -404,16 +404,29 @@ export class DefaultIdentifyService implements Startable, IdentifyService {
log('received identify from %p', connection.remotePeer)

if (message == null) {
throw new Error('Message was null or undefined')
throw new CodeError('message was null or undefined', 'ERR_INVALID_MESSAGE')
}

const peer = {
addresses: message.listenAddrs.map(buf => ({
const peer: PeerData = {}

if (message.listenAddrs.length > 0) {
peer.addresses = message.listenAddrs.map(buf => ({
multiaddr: multiaddr(buf)
})),
protocols: message.protocols,
metadata: new Map(),
peerRecordEnvelope: message.signedPeerRecord
}))
}

if (message.protocols.length > 0) {
peer.protocols = message.protocols
}

if (message.publicKey != null) {
peer.publicKey = message.publicKey

const peerId = await peerIdFromKeys(message.publicKey)

if (!peerId.equals(connection.remotePeer)) {
throw new CodeError('public key did not match remote PeerId', 'ERR_INVALID_PUBLIC_KEY')
}
}

let output: SignedPeerRecord | undefined
Expand All @@ -428,12 +441,12 @@ export class DefaultIdentifyService implements Startable, IdentifyService {

// Verify peerId
if (!peerRecord.peerId.equals(envelope.peerId)) {
throw new Error('signing key does not match PeerId in the PeerRecord')
throw new CodeError('signing key does not match PeerId in the PeerRecord', 'ERR_INVALID_SIGNING_KEY')
}

// Make sure remote peer is the one sending the record
if (!connection.remotePeer.equals(peerRecord.peerId)) {
throw new Error('signing key does not match remote PeerId')
throw new CodeError('signing key does not match remote PeerId', 'ERR_INVALID_PEER_RECORD_KEY')
}

let existingPeer: Peer | undefined
Expand Down Expand Up @@ -481,15 +494,25 @@ export class DefaultIdentifyService implements Startable, IdentifyService {
log('%p did not send a signed peer record', connection.remotePeer)
}

if (message.agentVersion != null) {
peer.metadata.set('AgentVersion', uint8ArrayFromString(message.agentVersion))
}
log('patching %p with', peer)
await this.peerStore.patch(connection.remotePeer, peer)

if (message.protocolVersion != null) {
peer.metadata.set('ProtocolVersion', uint8ArrayFromString(message.protocolVersion))
}
if (message.agentVersion != null || message.protocolVersion != null) {
const metadata: Record<string, Uint8Array> = {}

await this.peerStore.patch(connection.remotePeer, peer)
if (message.agentVersion != null) {
metadata.AgentVersion = uint8ArrayFromString(message.agentVersion)
}

if (message.protocolVersion != null) {
metadata.ProtocolVersion = uint8ArrayFromString(message.protocolVersion)
}

log('updating %p metadata', peer)
await this.peerStore.merge(connection.remotePeer, {
metadata
})
}

const result: IdentifyResult = {
peerId: connection.remotePeer,
Expand Down
36 changes: 36 additions & 0 deletions packages/libp2p/test/connection-manager/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -536,4 +536,40 @@ describe('Connection Manager', () => {
await expect(connectionManager.acceptIncomingConnection(maConn2))
.to.eventually.be.true()
})

it('should allow dialing peers when an existing transient connection exists', async () => {
connectionManager = new DefaultConnectionManager({
peerId: await createEd25519PeerId(),
peerStore: stubInterface<PeerStore>(),
transportManager: stubInterface<TransportManager>(),
connectionGater: stubInterface<ConnectionGater>(),
events: new TypedEventEmitter()
}, {
...defaultOptions,
maxIncomingPendingConnections: 1
})
await connectionManager.start()

const targetPeer = await createEd25519PeerId()
const addr = multiaddr(`/ip4/123.123.123.123/tcp/123/p2p/${targetPeer}`)

const existingConnection = stubInterface<Connection>({
transient: true
})
const newConnection = stubInterface<Connection>()

sinon.stub(connectionManager.dialQueue, 'dial')
.withArgs(addr)
.resolves(newConnection)

// we have an existing transient connection
const map = connectionManager.getConnectionsMap()
map.set(targetPeer, [
existingConnection
])

const conn = await connectionManager.openConnection(addr)

expect(conn).to.equal(newConnection)
})
})
Loading

0 comments on commit 308097f

Please sign in to comment.