From dd400cd57bd4943469af1ffc67b235a46c2b206c Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Thu, 2 Nov 2023 14:44:36 +0000 Subject: [PATCH 1/3] fix: allow dialing a peer when we only have transient connections (#2187) If we dial a peer that we already have a connection to, we return that connection instead of dialing. This creates a problem when we are trying to make a direct connection to a peer that we already have a relay connection to since the relay connection is returned. The fix here is to allow additional dials to peers when we only have relay connections. If a direct connection exists, we will still return it instead of dialing. --- .../src/circuit-relay/transport/listener.ts | 24 ++-------- .../src/connection-manager/dial-queue.ts | 45 ++++++++++++++++++- .../libp2p/src/connection-manager/index.ts | 12 ++--- .../test/connection-manager/index.spec.ts | 36 +++++++++++++++ 4 files changed, 90 insertions(+), 27 deletions(-) diff --git a/packages/libp2p/src/circuit-relay/transport/listener.ts b/packages/libp2p/src/circuit-relay/transport/listener.ts index 29b6c6e608..d7718c7698 100644 --- a/packages/libp2p/src/circuit-relay/transport/listener.ts +++ b/packages/libp2p/src/circuit-relay/transport/listener.ts @@ -2,10 +2,8 @@ import { CodeError } from '@libp2p/interface/errors' import { TypedEventEmitter } from '@libp2p/interface/events' import { logger } from '@libp2p/logger' import { PeerMap } from '@libp2p/peer-collections' -import { peerIdFromString } from '@libp2p/peer-id' import { multiaddr } from '@multiformats/multiaddr' import type { ReservationStore } from './reservation-store.js' -import type { Connection } from '@libp2p/interface/connection' import type { PeerId } from '@libp2p/interface/peer-id' import type { Listener, ListenerEvents } from '@libp2p/interface/transport' import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager' @@ -41,25 +39,9 @@ class CircuitRelayTransportListener extends TypedEventEmitter im async listen (addr: Multiaddr): Promise { log('listen on %a', addr) - const relayPeerStr = addr.getPeerId() - let relayConn: Connection | undefined - - // check if we already have a connection to the relay - if (relayPeerStr != null) { - const relayPeer = peerIdFromString(relayPeerStr) - const connections = this.connectionManager.getConnectionsMap().get(relayPeer) ?? [] - - if (connections.length > 0) { - relayConn = connections[0] - } - } - - // open a new connection as we don't already have one - if (relayConn == null) { - const addrString = addr.toString().split('/p2p-circuit').find(a => a !== '') - const ma = multiaddr(addrString) - relayConn = await this.connectionManager.openConnection(ma) - } + // remove the circuit part to get the peer id of the relay + const relayAddr = addr.decapsulate('/p2p-circuit') + const relayConn = await this.connectionManager.openConnection(relayAddr) if (!this.relayStore.hasReservation(relayConn.remotePeer)) { // addRelay calls transportManager.listen which calls this listen method diff --git a/packages/libp2p/src/connection-manager/dial-queue.ts b/packages/libp2p/src/connection-manager/dial-queue.ts index 232d2e39e1..280e677c02 100644 --- a/packages/libp2p/src/connection-manager/dial-queue.ts +++ b/packages/libp2p/src/connection-manager/dial-queue.ts @@ -1,6 +1,7 @@ import { AbortError, CodeError } from '@libp2p/interface/errors' import { setMaxListeners } from '@libp2p/interface/events' import { logger } from '@libp2p/logger' +import { PeerMap } from '@libp2p/peer-collections' import { defaultAddressSort } from '@libp2p/utils/address-sort' import { type Multiaddr, type Resolver, resolvers } from '@multiformats/multiaddr' import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers' @@ -35,6 +36,7 @@ export interface PendingDialTarget { export interface DialOptions extends AbortOptions { priority?: number + force?: boolean } interface PendingDialInternal extends PendingDial { @@ -48,6 +50,7 @@ interface DialerInit { maxParallelDialsPerPeer?: number dialTimeout?: number resolvers?: Record + connections?: PeerMap } const defaultOptions = { @@ -83,12 +86,14 @@ export class DialQueue { private readonly inProgressDialCount?: Metric private readonly pendingDialCount?: Metric private readonly shutDownController: AbortController + private readonly connections: PeerMap constructor (components: DialQueueComponents, init: DialerInit = {}) { this.addressSorter = init.addressSorter ?? defaultOptions.addressSorter this.maxPeerAddrsToDial = init.maxPeerAddrsToDial ?? defaultOptions.maxPeerAddrsToDial this.maxParallelDialsPerPeer = init.maxParallelDialsPerPeer ?? defaultOptions.maxParallelDialsPerPeer this.dialTimeout = init.dialTimeout ?? defaultOptions.dialTimeout + this.connections = init.connections ?? new PeerMap() this.peerId = components.peerId this.peerStore = components.peerStore @@ -187,6 +192,23 @@ export class DialQueue { throw err } + // make sure we don't have an existing connection to any of the addresses we + // are about to dial + let existingConnection = Array.from(this.connections.values()).flat().find(conn => { + if (options.force === true) { + return false + } + + return addrsToDial.find(addr => { + return addr.multiaddr.equals(conn.remoteAddr) + }) + }) + + if (existingConnection != null) { + log('already connected to %a', existingConnection.remoteAddr) + return existingConnection + } + // ready to dial, all async work finished - make sure we don't have any // pending dials in progress for this peer or set of multiaddrs const existingDial = this.pendingDials.find(dial => { @@ -257,7 +279,28 @@ export class DialQueue { // let other dials join this one this.pendingDials.push(pendingDial) - return pendingDial.promise + const connection = await pendingDial.promise + + // we may have been dialing a multiaddr without a peer id attached but by + // this point we have upgraded the connection so the remote peer information + // should be available - check again that we don't already have a connection + // to the remote multiaddr + existingConnection = Array.from(this.connections.values()).flat().find(conn => { + if (options.force === true) { + return false + } + + return conn.id !== connection.id && conn.remoteAddr.equals(connection.remoteAddr) + }) + + if (existingConnection != null) { + log('already connected to %a', existingConnection.remoteAddr) + await connection.close() + return existingConnection + } + + log('connection opened to %a', connection.remoteAddr) + return connection } private createDialAbortControllers (userSignal?: AbortSignal): ClearableSignal { diff --git a/packages/libp2p/src/connection-manager/index.ts b/packages/libp2p/src/connection-manager/index.ts index c13e49a53a..0afceac860 100644 --- a/packages/libp2p/src/connection-manager/index.ts +++ b/packages/libp2p/src/connection-manager/index.ts @@ -261,7 +261,8 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { dialTimeout: init.dialTimeout ?? DIAL_TIMEOUT, resolvers: init.resolvers ?? { dnsaddr: dnsaddrResolver - } + }, + connections: this.connections }) } @@ -505,12 +506,13 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { if (peerId != null && options.force !== true) { log('dial %p', peerId) - const existingConnections = this.getConnections(peerId) + const existingConnection = this.getConnections(peerId) + .find(conn => !conn.transient) - if (existingConnections.length > 0) { - log('had an existing connection to %p', peerId) + if (existingConnection != null) { + log('had an existing non-transient connection to %p', peerId) - return existingConnections[0] + return existingConnection } } diff --git a/packages/libp2p/test/connection-manager/index.spec.ts b/packages/libp2p/test/connection-manager/index.spec.ts index ee31843332..4cde008591 100644 --- a/packages/libp2p/test/connection-manager/index.spec.ts +++ b/packages/libp2p/test/connection-manager/index.spec.ts @@ -536,4 +536,40 @@ describe('Connection Manager', () => { await expect(connectionManager.acceptIncomingConnection(maConn2)) .to.eventually.be.true() }) + + it('should allow dialing peers when an existing transient connection exists', async () => { + connectionManager = new DefaultConnectionManager({ + peerId: await createEd25519PeerId(), + peerStore: stubInterface(), + transportManager: stubInterface(), + connectionGater: stubInterface(), + events: new TypedEventEmitter() + }, { + ...defaultOptions, + maxIncomingPendingConnections: 1 + }) + await connectionManager.start() + + const targetPeer = await createEd25519PeerId() + const addr = multiaddr(`/ip4/123.123.123.123/tcp/123/p2p/${targetPeer}`) + + const existingConnection = stubInterface({ + transient: true + }) + const newConnection = stubInterface() + + sinon.stub(connectionManager.dialQueue, 'dial') + .withArgs(addr) + .resolves(newConnection) + + // we have an existing transient connection + const map = connectionManager.getConnectionsMap() + map.set(targetPeer, [ + existingConnection + ]) + + const conn = await connectionManager.openConnection(addr) + + expect(conn).to.equal(newConnection) + }) }) From 8add94ea4b2dffc44ebb309cda696c7d08a4e010 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 2 Nov 2023 15:23:12 -0500 Subject: [PATCH 2/3] deps: bump p-limit from 4.0.0 to 5.0.0 (#2190) Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Alex Potsides --- packages/interface-compliance-tests/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/interface-compliance-tests/package.json b/packages/interface-compliance-tests/package.json index 2080d453c8..1ff59b597a 100644 --- a/packages/interface-compliance-tests/package.json +++ b/packages/interface-compliance-tests/package.json @@ -128,7 +128,7 @@ "merge-options": "^3.0.4", "p-defer": "^4.0.0", "p-event": "^6.0.0", - "p-limit": "^4.0.0", + "p-limit": "^5.0.0", "p-wait-for": "^5.0.2", "protons-runtime": "^5.0.0", "sinon": "^17.0.0", From 025c082a4d3d08904f1f5b0209ed6f40648fb78d Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Fri, 3 Nov 2023 12:01:15 +0000 Subject: [PATCH 3/3] fix: do not overwrite addresses on identify push when none are sent (#2192) During identify-push in response to a protocol update go-libp2p does not send a signed peer record or the current list of listen addresses. Protobuf does not let us differentiate between an empty list and no list items at all because the field is just repeated - an absence of repeated fields doesn't mean the list was omitted. When the listen address list is empty, preserve any existing addresses. --- packages/libp2p/src/identify/identify.ts | 57 ++++++++++----- packages/libp2p/test/identify/index.spec.ts | 79 ++++++++++++++++++++- 2 files changed, 118 insertions(+), 18 deletions(-) diff --git a/packages/libp2p/src/identify/identify.ts b/packages/libp2p/src/identify/identify.ts index 906e5fddb8..ea5fbcf796 100644 --- a/packages/libp2p/src/identify/identify.ts +++ b/packages/libp2p/src/identify/identify.ts @@ -23,7 +23,7 @@ import type { Libp2pEvents, IdentifyResult, SignedPeerRecord, AbortOptions } fro import type { Connection, Stream } from '@libp2p/interface/connection' import type { TypedEventTarget } from '@libp2p/interface/events' import type { PeerId } from '@libp2p/interface/peer-id' -import type { Peer, PeerStore } from '@libp2p/interface/peer-store' +import type { Peer, PeerData, PeerStore } from '@libp2p/interface/peer-store' import type { Startable } from '@libp2p/interface/startable' import type { AddressManager } from '@libp2p/interface-internal/address-manager' import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager' @@ -404,17 +404,30 @@ export class DefaultIdentifyService implements Startable, IdentifyService { log('received identify from %p', connection.remotePeer) if (message == null) { - throw new Error('Message was null or undefined') + throw new CodeError('message was null or undefined', 'ERR_INVALID_MESSAGE') } - const peer = { - addresses: message.listenAddrs.map(buf => ({ + const peer: PeerData = {} + + if (message.listenAddrs.length > 0) { + peer.addresses = message.listenAddrs.map(buf => ({ isCertified: false, multiaddr: multiaddr(buf) - })), - protocols: message.protocols, - metadata: new Map(), - peerRecordEnvelope: message.signedPeerRecord + })) + } + + if (message.protocols.length > 0) { + peer.protocols = message.protocols + } + + if (message.publicKey != null) { + peer.publicKey = message.publicKey + + const peerId = await peerIdFromKeys(message.publicKey) + + if (!peerId.equals(connection.remotePeer)) { + throw new CodeError('public key did not match remote PeerId', 'ERR_INVALID_PUBLIC_KEY') + } } let output: SignedPeerRecord | undefined @@ -429,12 +442,12 @@ export class DefaultIdentifyService implements Startable, IdentifyService { // Verify peerId if (!peerRecord.peerId.equals(envelope.peerId)) { - throw new Error('signing key does not match PeerId in the PeerRecord') + throw new CodeError('signing key does not match PeerId in the PeerRecord', 'ERR_INVALID_SIGNING_KEY') } // Make sure remote peer is the one sending the record if (!connection.remotePeer.equals(peerRecord.peerId)) { - throw new Error('signing key does not match remote PeerId') + throw new CodeError('signing key does not match remote PeerId', 'ERR_INVALID_PEER_RECORD_KEY') } let existingPeer: Peer | undefined @@ -482,15 +495,25 @@ export class DefaultIdentifyService implements Startable, IdentifyService { log('%p did not send a signed peer record', connection.remotePeer) } - if (message.agentVersion != null) { - peer.metadata.set('AgentVersion', uint8ArrayFromString(message.agentVersion)) - } + log('patching %p with', peer) + await this.peerStore.patch(connection.remotePeer, peer) - if (message.protocolVersion != null) { - peer.metadata.set('ProtocolVersion', uint8ArrayFromString(message.protocolVersion)) - } + if (message.agentVersion != null || message.protocolVersion != null) { + const metadata: Record = {} - await this.peerStore.patch(connection.remotePeer, peer) + if (message.agentVersion != null) { + metadata.AgentVersion = uint8ArrayFromString(message.agentVersion) + } + + if (message.protocolVersion != null) { + metadata.ProtocolVersion = uint8ArrayFromString(message.protocolVersion) + } + + log('updating %p metadata', peer) + await this.peerStore.merge(connection.remotePeer, { + metadata + }) + } const result: IdentifyResult = { peerId: connection.remotePeer, diff --git a/packages/libp2p/test/identify/index.spec.ts b/packages/libp2p/test/identify/index.spec.ts index 63134a7d0c..ee64e175bd 100644 --- a/packages/libp2p/test/identify/index.spec.ts +++ b/packages/libp2p/test/identify/index.spec.ts @@ -3,7 +3,7 @@ import { TypedEventEmitter } from '@libp2p/interface/events' import { start, stop } from '@libp2p/interface/startable' -import { mockConnectionGater, mockRegistrar, mockUpgrader, connectionPair } from '@libp2p/interface-compliance-tests/mocks' +import { mockConnectionGater, mockRegistrar, mockUpgrader, connectionPair, mockStream } from '@libp2p/interface-compliance-tests/mocks' import { createEd25519PeerId } from '@libp2p/peer-id-factory' import { PeerRecord, RecordEnvelope } from '@libp2p/peer-record' import { PersistentPeerStore } from '@libp2p/peer-store' @@ -15,6 +15,7 @@ import drain from 'it-drain' import * as lp from 'it-length-prefixed' import { pipe } from 'it-pipe' import { pbStream } from 'it-protobuf-stream' +import { pushable } from 'it-pushable' import pDefer from 'p-defer' import sinon from 'sinon' import { stubInterface } from 'sinon-ts' @@ -31,8 +32,11 @@ import { DefaultIdentifyService } from '../../src/identify/identify.js' import { identifyService, type IdentifyServiceInit, Message } from '../../src/identify/index.js' import { Identify } from '../../src/identify/pb/message.js' import { DefaultTransportManager } from '../../src/transport-manager.js' +import type { Connection } from '@libp2p/interface/connection' +import type { Peer } from '@libp2p/interface/peer-store' import type { IncomingStreamData } from '@libp2p/interface-internal/registrar' import type { TransportManager } from '@libp2p/interface-internal/transport-manager' +import type { Duplex, Source } from 'it-stream-types' const listenMaddrs = [multiaddr('/ip4/127.0.0.1/tcp/15002/ws')] @@ -504,4 +508,77 @@ describe('identify', () => { }]) expect(peer.id.publicKey).to.equalBytes(remoteComponents.peerId.publicKey) }) + + it('should not overwrite peer data when fields are omitted', async () => { + const localIdentify = new DefaultIdentifyService(localComponents, defaultInit) + await start(localIdentify) + + const peer: Peer = { + id: remoteComponents.peerId, + addresses: [{ + multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4001'), + isCertified: true + }], + protocols: [ + '/proto/1' + ], + metadata: new Map([['key', uint8ArrayFromString('value')]]), + tags: new Map([['key', { value: 1 }]]) + } + + await localComponents.peerStore.save(remoteComponents.peerId, peer) + + const duplex: Duplex, any> = { + source: pushable(), + sink: async (source) => { + await drain(source) + } + } + + duplex.source.push(lp.encode.single(Identify.encode({ + protocols: [ + '/proto/2' + ] + }))) + + await localIdentify._handlePush({ + stream: mockStream(duplex), + connection: stubInterface({ + remotePeer: remoteComponents.peerId + }) + }) + + const updatedPeerData = await localComponents.peerStore.get(remoteComponents.peerId) + expect(updatedPeerData.addresses[0].multiaddr.toString()).to.equal('/ip4/127.0.0.1/tcp/4001') + expect(updatedPeerData.protocols).to.deep.equal(['/proto/2']) + expect(updatedPeerData.metadata.get('key')).to.equalBytes(uint8ArrayFromString('value')) + expect(updatedPeerData.tags.get('key')).to.deep.equal({ value: 1 }) + }) + + it('should reject incorrect public key', async () => { + const localIdentify = new DefaultIdentifyService(localComponents, defaultInit) + await start(localIdentify) + + const duplex: Duplex, any> = { + source: pushable(), + sink: async (source) => { + await drain(source) + } + } + + duplex.source.push(lp.encode.single(Identify.encode({ + publicKey: Uint8Array.from([0, 1, 2, 3, 4]) + }))) + + const localPeerStorePatchSpy = sinon.spy(localComponents.peerStore, 'patch') + + await localIdentify._handlePush({ + stream: mockStream(duplex), + connection: stubInterface({ + remotePeer: remoteComponents.peerId + }) + }) + + expect(localPeerStorePatchSpy.called).to.be.false('patch was called when public key was invalid') + }) })