From a970b536caeb6e9aff4d1b2e6f097878c475ef3f Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Tue, 18 Jun 2024 09:17:23 +0100 Subject: [PATCH 1/3] chore: remove logging code (#2594) Remove code left in by accident. --- .../src/transport/reservation-store.ts | 2 +- packages/utils/src/queue/job.ts | 2 +- packages/utils/src/queue/recipient.ts | 4 +--- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/packages/transport-circuit-relay-v2/src/transport/reservation-store.ts b/packages/transport-circuit-relay-v2/src/transport/reservation-store.ts index 280d6347b8..d22ce32246 100644 --- a/packages/transport-circuit-relay-v2/src/transport/reservation-store.ts +++ b/packages/transport-circuit-relay-v2/src/transport/reservation-store.ts @@ -167,7 +167,7 @@ export class ReservationStore extends TypedEventEmitter } if (this.relayFilter.has(peerId.toBytes())) { - this.log('potential relay peer %p has failed previously, not trying again', peerId, new Error('where').stack) + this.log('potential relay peer %p has failed previously, not trying again', peerId) return } diff --git a/packages/utils/src/queue/job.ts b/packages/utils/src/queue/job.ts index 3830f791a2..6b08575d2f 100644 --- a/packages/utils/src/queue/job.ts +++ b/packages/utils/src/queue/job.ts @@ -59,7 +59,7 @@ export class Job { - const recipient = new JobRecipient((new Error('where')).stack, options.signal) + const recipient = new JobRecipient(options.signal) this.recipients.push(recipient) options.signal?.addEventListener('abort', this.onAbort) diff --git a/packages/utils/src/queue/recipient.ts b/packages/utils/src/queue/recipient.ts index 07e8f5733f..f482651eff 100644 --- a/packages/utils/src/queue/recipient.ts +++ b/packages/utils/src/queue/recipient.ts @@ -5,12 +5,10 @@ import type { DeferredPromise } from 'p-defer' export class JobRecipient { public deferred: DeferredPromise public signal?: AbortSignal - public where?: string - constructor (where?: string, signal?: AbortSignal) { + constructor (signal?: AbortSignal) { this.signal = signal this.deferred = pDefer() - this.where = where this.onAbort = this.onAbort.bind(this) this.signal?.addEventListener('abort', this.onAbort) From 8e4fdcde999a64b6f6e573960b2a53cc78c0bebf Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Tue, 18 Jun 2024 09:30:07 +0100 Subject: [PATCH 2/3] fix: create RTCPeerConnection after dialing remote peer (#2593) Chrome limits how many RTCPeerConnections a given tab can instantiated during it's lifetime - https://issues.chromium.org/issues/41378764 To delay hitting this limit, only create the dial-end RTCPeerConnection once a relayed connection has successfully been opened to the dial target, this prevents needlessly creating RTCPeerConnections when the dial fails before they are actually used. Fixes #2591 --- .../private-to-private/initiate-connection.ts | 30 +++++++++---- .../src/private-to-private/transport.ts | 31 +++++++------- .../src/private-to-private/util.ts | 7 +++- packages/transport-webrtc/test/peer.spec.ts | 42 +++++++++---------- packages/transport-webrtc/test/stream.spec.ts | 2 +- 5 files changed, 62 insertions(+), 50 deletions(-) diff --git a/packages/transport-webrtc/src/private-to-private/initiate-connection.ts b/packages/transport-webrtc/src/private-to-private/initiate-connection.ts index 3350f0ac8c..8632e28bc9 100644 --- a/packages/transport-webrtc/src/private-to-private/initiate-connection.ts +++ b/packages/transport-webrtc/src/private-to-private/initiate-connection.ts @@ -1,12 +1,13 @@ import { CodeError } from '@libp2p/interface' import { peerIdFromString } from '@libp2p/peer-id' import { pbStream } from 'it-protobuf-stream' -import { type RTCPeerConnection, RTCSessionDescription } from '../webrtc/index.js' +import { DataChannelMuxerFactory } from '../muxer.js' +import { RTCPeerConnection, RTCSessionDescription } from '../webrtc/index.js' import { Message } from './pb/message.js' import { SIGNALING_PROTO_ID, splitAddr, type WebRTCTransportMetrics } from './transport.js' import { readCandidatesUntilConnected } from './util.js' import type { DataChannelOptions } from '../index.js' -import type { LoggerOptions, Connection } from '@libp2p/interface' +import type { LoggerOptions, Connection, ComponentLogger } from '@libp2p/interface' import type { ConnectionManager, IncomingStreamData, TransportManager } from '@libp2p/interface-internal' import type { Multiaddr } from '@multiformats/multiaddr' @@ -17,16 +18,18 @@ export interface IncomingStreamOpts extends IncomingStreamData { } export interface ConnectOptions extends LoggerOptions { - peerConnection: RTCPeerConnection + rtcConfiguration?: RTCConfiguration + dataChannel?: DataChannelOptions multiaddr: Multiaddr connectionManager: ConnectionManager transportManager: TransportManager dataChannelOptions?: Partial signal?: AbortSignal metrics?: WebRTCTransportMetrics + logger: ComponentLogger } -export async function initiateConnection ({ peerConnection, signal, metrics, multiaddr: ma, connectionManager, transportManager, log }: ConnectOptions): Promise<{ remoteAddress: Multiaddr }> { +export async function initiateConnection ({ rtcConfiguration, dataChannel, signal, metrics, multiaddr: ma, connectionManager, transportManager, log, logger }: ConnectOptions): Promise<{ remoteAddress: Multiaddr, peerConnection: RTCPeerConnection, muxerFactory: DataChannelMuxerFactory }> { const { baseAddr } = splitAddr(ma) metrics?.dialerEvents.increment({ open: true }) @@ -64,6 +67,13 @@ export async function initiateConnection ({ peerConnection, signal, metrics, mul }) const messageStream = pbStream(stream).pb(Message) + const peerConnection = new RTCPeerConnection(rtcConfiguration) + const muxerFactory = new DataChannelMuxerFactory({ + logger + }, { + peerConnection, + dataChannelOptions: dataChannel + }) try { // we create the channel so that the RTCPeerConnection has a component for @@ -79,7 +89,7 @@ export async function initiateConnection ({ peerConnection, signal, metrics, mul // see - https://www.w3.org/TR/webrtc/#rtcpeerconnectioniceevent const data = JSON.stringify(candidate?.toJSON() ?? null) - log.trace('initiator sending ICE candidate %s', data) + log.trace('initiator sending ICE candidate %o', candidate) void messageStream.write({ type: Message.Type.ICE_CANDIDATE, @@ -142,17 +152,21 @@ export async function initiateConnection ({ peerConnection, signal, metrics, mul log.trace('initiator connected, closing init channel') channel.close() - log.trace('initiator closing signalling stream') - await messageStream.unwrap().unwrap().close({ + log.trace('closing signalling channel') + await stream.close({ signal }) log.trace('initiator connected to remote address %s', ma) return { - remoteAddress: ma + remoteAddress: ma, + peerConnection, + muxerFactory } } catch (err: any) { + log.error('outgoing signalling error', err) + peerConnection.close() stream.abort(err) throw err diff --git a/packages/transport-webrtc/src/private-to-private/transport.ts b/packages/transport-webrtc/src/private-to-private/transport.ts index 89ad2ee48b..e0c01e1633 100644 --- a/packages/transport-webrtc/src/private-to-private/transport.ts +++ b/packages/transport-webrtc/src/private-to-private/transport.ts @@ -133,20 +133,16 @@ export class WebRTCTransport implements Transport, Startable { async dial (ma: Multiaddr, options: DialOptions): Promise { this.log.trace('dialing address: %a', ma) - const peerConnection = new RTCPeerConnection(this.init.rtcConfiguration) - const muxerFactory = new DataChannelMuxerFactory(this.components, { - peerConnection, - dataChannelOptions: this.init.dataChannel - }) - - const { remoteAddress } = await initiateConnection({ - peerConnection, + const { remoteAddress, peerConnection, muxerFactory } = await initiateConnection({ + rtcConfiguration: this.init.rtcConfiguration, + dataChannel: this.init.dataChannel, multiaddr: ma, dataChannelOptions: this.init.dataChannel, signal: options.signal, connectionManager: this.components.connectionManager, transportManager: this.components.transportManager, - log: this.log + log: this.log, + logger: this.components.logger }) const webRTCConn = new WebRTCMultiaddrConnection(this.components, { @@ -185,6 +181,11 @@ export class WebRTCTransport implements Transport, Startable { log: this.log }) + // close the stream if SDP messages have been exchanged successfully + await stream.close({ + signal + }) + const webRTCConn = new WebRTCMultiaddrConnection(this.components, { peerConnection, timeline: { open: (new Date()).getTime() }, @@ -192,20 +193,18 @@ export class WebRTCTransport implements Transport, Startable { metrics: this.metrics?.listenerEvents }) - // close the connection on shut down - this._closeOnShutdown(peerConnection, webRTCConn) - await this.components.upgrader.upgradeInbound(webRTCConn, { skipEncryption: true, skipProtection: true, muxerFactory }) - // close the stream if SDP messages have been exchanged successfully - await stream.close({ - signal - }) + // close the connection on shut down + this._closeOnShutdown(peerConnection, webRTCConn) } catch (err: any) { + this.log.error('incoming signalling error', err) + + peerConnection.close() stream.abort(err) throw err } diff --git a/packages/transport-webrtc/src/private-to-private/util.ts b/packages/transport-webrtc/src/private-to-private/util.ts index a3fba97f43..e13985efaf 100644 --- a/packages/transport-webrtc/src/private-to-private/util.ts +++ b/packages/transport-webrtc/src/private-to-private/util.ts @@ -23,11 +23,14 @@ export const readCandidatesUntilConnected = async (pc: RTCPeerConnection, stream connectedPromise.promise, stream.read({ signal: options.signal - }) + }).catch(() => {}) ]) // stream ended or we became connected if (message == null) { + // throw if we timed out + options.signal?.throwIfAborted() + break } @@ -48,7 +51,7 @@ export const readCandidatesUntilConnected = async (pc: RTCPeerConnection, stream const candidate = new RTCIceCandidate(candidateInit) - options.log.trace('%s received new ICE candidate', options.direction, candidate) + options.log.trace('%s received new ICE candidate %o', options.direction, candidateInit) try { await pc.addIceCandidate(candidate) diff --git a/packages/transport-webrtc/test/peer.spec.ts b/packages/transport-webrtc/test/peer.spec.ts index 8c2b85f188..7a7ab3db16 100644 --- a/packages/transport-webrtc/test/peer.spec.ts +++ b/packages/transport-webrtc/test/peer.spec.ts @@ -15,19 +15,19 @@ import { Message } from '../src/private-to-private/pb/message.js' import { handleIncomingStream } from '../src/private-to-private/signaling-stream-handler.js' import { SIGNALING_PROTO_ID, WebRTCTransport, splitAddr } from '../src/private-to-private/transport.js' import { RTCPeerConnection, RTCSessionDescription } from '../src/webrtc/index.js' -import type { Logger, Connection, Stream } from '@libp2p/interface' +import type { Logger, Connection, Stream, ComponentLogger } from '@libp2p/interface' import type { ConnectionManager, TransportManager } from '@libp2p/interface-internal' const browser = detect() interface Initiator { multiaddr: Multiaddr - peerConnection: RTCPeerConnection connectionManager: StubbedInstance transportManager: StubbedInstance connection: StubbedInstance stream: Stream log: Logger + logger: ComponentLogger } interface Recipient { @@ -67,12 +67,12 @@ async function getComponents (): Promise { return { initiator: { multiaddr: receiverMultiaddr, - peerConnection: new RTCPeerConnection(), connectionManager: stubInterface(), transportManager: stubInterface(), connection: stubInterface(), stream: initiatorStream, - log: logger('test') + log: logger('test'), + logger: defaultLogger() }, recipient: { peerConnection: new RTCPeerConnection(), @@ -91,9 +91,10 @@ describe('webrtc basic', () => { const isFirefox = ((browser != null) && browser.name === 'firefox') let initiator: Initiator let recipient: Recipient + let initiatorPeerConnection: RTCPeerConnection afterEach(() => { - initiator?.peerConnection?.close() + initiatorPeerConnection?.close() recipient?.peerConnection?.close() }) @@ -109,7 +110,7 @@ describe('webrtc basic', () => { // signalling stream opens successfully initiator.connection.newStream.withArgs(SIGNALING_PROTO_ID).resolves(initiator.stream) - await expect( + ;[{ peerConnection: initiatorPeerConnection }] = await expect( Promise.all([ initiateConnection(initiator), handleIncomingStream(recipient) @@ -118,11 +119,11 @@ describe('webrtc basic', () => { await pRetry(async () => { if (isFirefox) { - expect(initiator.peerConnection.iceConnectionState).eq('connected') + expect(initiatorPeerConnection.iceConnectionState).eq('connected') expect(recipient.peerConnection.iceConnectionState).eq('connected') return } - expect(initiator.peerConnection.connectionState).eq('connected') + expect(initiatorPeerConnection.connectionState).eq('connected') expect(recipient.peerConnection.connectionState).eq('connected') }) }) @@ -137,18 +138,14 @@ describe('webrtc basic', () => { // transport manager dials recipient initiator.transportManager.dial.resolves(initiator.connection) - const createOffer = initiator.peerConnection.setRemoteDescription.bind(initiator.peerConnection) - - initiator.peerConnection.setRemoteDescription = async (name) => { - // the dial is aborted + initiator.connection.newStream.callsFake(async () => { + // the operation is aborted abortController.abort(new Error('Oh noes!')) - // setting the description takes some time + // opening the stream takes some time await delay(100) - return createOffer(name) - } - - // signalling stream opens successfully - initiator.connection.newStream.withArgs(SIGNALING_PROTO_ID).resolves(initiator.stream) + // signalling stream opens successfully + return initiator.stream + }) await expect(Promise.all([ initiateConnection({ @@ -164,9 +161,10 @@ describe('webrtc basic', () => { describe('webrtc receiver', () => { let initiator: Initiator let recipient: Recipient + let initiatorPeerConnection: RTCPeerConnection afterEach(() => { - initiator?.peerConnection?.close() + initiatorPeerConnection?.close() recipient?.peerConnection?.close() }) @@ -177,18 +175,16 @@ describe('webrtc receiver', () => { await stream.write({ type: Message.Type.SDP_OFFER, data: 'bad' }) await expect(receiverPeerConnectionPromise).to.be.rejectedWith(/Failed to set remoteDescription/) - - initiator.peerConnection.close() - recipient.peerConnection.close() }) }) describe('webrtc dialer', () => { let initiator: Initiator let recipient: Recipient + let initiatorPeerConnection: RTCPeerConnection afterEach(() => { - initiator?.peerConnection?.close() + initiatorPeerConnection?.close() recipient?.peerConnection?.close() }) diff --git a/packages/transport-webrtc/test/stream.spec.ts b/packages/transport-webrtc/test/stream.spec.ts index b9c8735ea9..cb40e8912e 100644 --- a/packages/transport-webrtc/test/stream.spec.ts +++ b/packages/transport-webrtc/test/stream.spec.ts @@ -105,7 +105,7 @@ describe('Max message size', () => { await expect(webrtcStream.sink([new Uint8Array(1)])).to.eventually.be.rejected .with.property('code', 'ERR_BUFFER_CLEAR_TIMEOUT') const t1 = Date.now() - expect(t1 - t0).greaterThan(timeout) + expect(t1 - t0).greaterThanOrEqual(timeout) expect(t1 - t0).lessThan(timeout + 1000) // Some upper bound await closed.promise expect(webrtcStream.timeline.close).to.be.greaterThan(webrtcStream.timeline.open) From 9e0236627b50a389df3350a90e58720cc205f0af Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Tue, 18 Jun 2024 13:30:03 +0100 Subject: [PATCH 3/3] feat: allow passing a function for rtcConfiguration (#2590) In order to allow refreshing STUN/TURN credentials between dials, allow passing a function that returns rtc config. Fixes #2554 --- .../transport-webrtc/src/private-to-private/transport.ts | 6 +++--- .../transport-webrtc/src/private-to-public/transport.ts | 6 +++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/packages/transport-webrtc/src/private-to-private/transport.ts b/packages/transport-webrtc/src/private-to-private/transport.ts index e0c01e1633..134bd7a9b8 100644 --- a/packages/transport-webrtc/src/private-to-private/transport.ts +++ b/packages/transport-webrtc/src/private-to-private/transport.ts @@ -19,7 +19,7 @@ export const SIGNALING_PROTO_ID = '/webrtc-signaling/0.0.1' const INBOUND_CONNECTION_TIMEOUT = 30 * 1000 export interface WebRTCTransportInit { - rtcConfiguration?: RTCConfiguration + rtcConfiguration?: RTCConfiguration | (() => RTCConfiguration | Promise) dataChannel?: DataChannelOptions /** @@ -134,7 +134,7 @@ export class WebRTCTransport implements Transport, Startable { this.log.trace('dialing address: %a', ma) const { remoteAddress, peerConnection, muxerFactory } = await initiateConnection({ - rtcConfiguration: this.init.rtcConfiguration, + rtcConfiguration: typeof this.init.rtcConfiguration === 'function' ? await this.init.rtcConfiguration() : this.init.rtcConfiguration, dataChannel: this.init.dataChannel, multiaddr: ma, dataChannelOptions: this.init.dataChannel, @@ -166,7 +166,7 @@ export class WebRTCTransport implements Transport, Startable { async _onProtocol ({ connection, stream }: IncomingStreamData): Promise { const signal = AbortSignal.timeout(this.init.inboundConnectionTimeout ?? INBOUND_CONNECTION_TIMEOUT) - const peerConnection = new RTCPeerConnection(this.init.rtcConfiguration) + const peerConnection = new RTCPeerConnection(typeof this.init.rtcConfiguration === 'function' ? await this.init.rtcConfiguration() : this.init.rtcConfiguration) const muxerFactory = new DataChannelMuxerFactory(this.components, { peerConnection, dataChannelOptions: this.init.dataChannel diff --git a/packages/transport-webrtc/src/private-to-public/transport.ts b/packages/transport-webrtc/src/private-to-public/transport.ts index 1ce54776ad..bf026fabc8 100644 --- a/packages/transport-webrtc/src/private-to-public/transport.ts +++ b/packages/transport-webrtc/src/private-to-public/transport.ts @@ -52,6 +52,7 @@ export interface WebRTCMetrics { } export interface WebRTCTransportDirectInit { + rtcConfiguration?: RTCConfiguration | (() => RTCConfiguration | Promise) dataChannel?: DataChannelOptions } @@ -137,7 +138,10 @@ export class WebRTCDirectTransport implements Transport { hash: sdp.toSupportedHashFunction(remoteCerthash.name) } as any) - const peerConnection = new RTCPeerConnection({ certificates: [certificate] }) + const peerConnection = new RTCPeerConnection({ + ...(typeof this.init.rtcConfiguration === 'function' ? await this.init.rtcConfiguration() : this.init.rtcConfiguration ?? {}), + certificates: [certificate] + }) try { // create data channel for running the noise handshake. Once the data channel is opened,