Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into feat/webrtc-direct-in…
Browse files Browse the repository at this point in the history
…-node-js
  • Loading branch information
achingbrain committed Jun 18, 2024
2 parents 1e052de + 9e02366 commit 71bf4cb
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ export class ReservationStore extends TypedEventEmitter<ReservationStoreEvents>
}

if (this.relayFilter.has(peerId.toBytes())) {
this.log('potential relay peer %p has failed previously, not trying again', peerId, new Error('where').stack)
this.log('potential relay peer %p has failed previously, not trying again', peerId)
return
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { CodeError } from '@libp2p/interface'
import { peerIdFromString } from '@libp2p/peer-id'
import { pbStream } from 'it-protobuf-stream'
import { RTCSessionDescription } from '../webrtc/index.js'
import { DataChannelMuxerFactory } from '../muxer.js'
import { RTCPeerConnection, RTCSessionDescription } from '../webrtc/index.js'
import { Message } from './pb/message.js'
import { SIGNALING_PROTO_ID, splitAddr, type WebRTCTransportMetrics } from './transport.js'
import { readCandidatesUntilConnected } from './util.js'
import type { DataChannelOptions } from '../index.js'
import type { LoggerOptions, Connection } from '@libp2p/interface'
import type { LoggerOptions, Connection, ComponentLogger } from '@libp2p/interface'
import type { ConnectionManager, IncomingStreamData, TransportManager } from '@libp2p/interface-internal'
import type { Multiaddr } from '@multiformats/multiaddr'

Expand All @@ -17,16 +18,18 @@ export interface IncomingStreamOpts extends IncomingStreamData {
}

export interface ConnectOptions extends LoggerOptions {
peerConnection: RTCPeerConnection
rtcConfiguration?: RTCConfiguration
dataChannel?: DataChannelOptions
multiaddr: Multiaddr
connectionManager: ConnectionManager
transportManager: TransportManager
dataChannelOptions?: Partial<DataChannelOptions>
signal?: AbortSignal
metrics?: WebRTCTransportMetrics
logger: ComponentLogger
}

export async function initiateConnection ({ peerConnection, signal, metrics, multiaddr: ma, connectionManager, transportManager, log }: ConnectOptions): Promise<{ remoteAddress: Multiaddr }> {
export async function initiateConnection ({ rtcConfiguration, dataChannel, signal, metrics, multiaddr: ma, connectionManager, transportManager, log, logger }: ConnectOptions): Promise<{ remoteAddress: Multiaddr, peerConnection: RTCPeerConnection, muxerFactory: DataChannelMuxerFactory }> {
const { baseAddr } = splitAddr(ma)

metrics?.dialerEvents.increment({ open: true })
Expand Down Expand Up @@ -64,6 +67,13 @@ export async function initiateConnection ({ peerConnection, signal, metrics, mul
})

const messageStream = pbStream(stream).pb(Message)
const peerConnection = new RTCPeerConnection(rtcConfiguration)
const muxerFactory = new DataChannelMuxerFactory({
logger
}, {
peerConnection,
dataChannelOptions: dataChannel
})

try {
// we create the channel so that the RTCPeerConnection has a component for
Expand All @@ -79,7 +89,7 @@ export async function initiateConnection ({ peerConnection, signal, metrics, mul
// see - https://www.w3.org/TR/webrtc/#rtcpeerconnectioniceevent
const data = JSON.stringify(candidate?.toJSON() ?? null)

log.trace('initiator sending ICE candidate %s', data)
log.trace('initiator sending ICE candidate %o', candidate)

void messageStream.write({
type: Message.Type.ICE_CANDIDATE,
Expand Down Expand Up @@ -142,17 +152,21 @@ export async function initiateConnection ({ peerConnection, signal, metrics, mul
log.trace('initiator connected, closing init channel')
channel.close()

log.trace('initiator closing signalling stream')
await messageStream.unwrap().unwrap().close({
log.trace('closing signalling channel')
await stream.close({
signal
})

log.trace('initiator connected to remote address %s', ma)

return {
remoteAddress: ma
remoteAddress: ma,
peerConnection,
muxerFactory
}
} catch (err: any) {
log.error('outgoing signalling error', err)

peerConnection.close()
stream.abort(err)
throw err
Expand Down
35 changes: 17 additions & 18 deletions packages/transport-webrtc/src/private-to-private/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export const SIGNALING_PROTO_ID = '/webrtc-signaling/0.0.1'
const INBOUND_CONNECTION_TIMEOUT = 30 * 1000

export interface WebRTCTransportInit {
rtcConfiguration?: RTCConfiguration
rtcConfiguration?: RTCConfiguration | (() => RTCConfiguration | Promise<RTCConfiguration>)
dataChannel?: DataChannelOptions

/**
Expand Down Expand Up @@ -133,20 +133,16 @@ export class WebRTCTransport implements Transport, Startable {
async dial (ma: Multiaddr, options: DialOptions): Promise<Connection> {
this.log.trace('dialing address: %a', ma)

const peerConnection = new RTCPeerConnection(this.init.rtcConfiguration)
const muxerFactory = new DataChannelMuxerFactory(this.components, {
peerConnection,
dataChannelOptions: this.init.dataChannel
})

const { remoteAddress } = await initiateConnection({
peerConnection,
const { remoteAddress, peerConnection, muxerFactory } = await initiateConnection({
rtcConfiguration: typeof this.init.rtcConfiguration === 'function' ? await this.init.rtcConfiguration() : this.init.rtcConfiguration,
dataChannel: this.init.dataChannel,
multiaddr: ma,
dataChannelOptions: this.init.dataChannel,
signal: options.signal,
connectionManager: this.components.connectionManager,
transportManager: this.components.transportManager,
log: this.log
log: this.log,
logger: this.components.logger
})

const webRTCConn = new WebRTCMultiaddrConnection(this.components, {
Expand All @@ -170,7 +166,7 @@ export class WebRTCTransport implements Transport, Startable {

async _onProtocol ({ connection, stream }: IncomingStreamData): Promise<void> {
const signal = AbortSignal.timeout(this.init.inboundConnectionTimeout ?? INBOUND_CONNECTION_TIMEOUT)
const peerConnection = new RTCPeerConnection(this.init.rtcConfiguration)
const peerConnection = new RTCPeerConnection(typeof this.init.rtcConfiguration === 'function' ? await this.init.rtcConfiguration() : this.init.rtcConfiguration)
const muxerFactory = new DataChannelMuxerFactory(this.components, {
peerConnection,
dataChannelOptions: this.init.dataChannel
Expand All @@ -185,27 +181,30 @@ export class WebRTCTransport implements Transport, Startable {
log: this.log
})

// close the stream if SDP messages have been exchanged successfully
await stream.close({
signal
})

const webRTCConn = new WebRTCMultiaddrConnection(this.components, {
peerConnection,
timeline: { open: (new Date()).getTime() },
remoteAddr: remoteAddress,
metrics: this.metrics?.listenerEvents
})

// close the connection on shut down
this._closeOnShutdown(peerConnection, webRTCConn)

await this.components.upgrader.upgradeInbound(webRTCConn, {
skipEncryption: true,
skipProtection: true,
muxerFactory
})

// close the stream if SDP messages have been exchanged successfully
await stream.close({
signal
})
// close the connection on shut down
this._closeOnShutdown(peerConnection, webRTCConn)
} catch (err: any) {
this.log.error('incoming signalling error', err)

peerConnection.close()
stream.abort(err)
throw err
}
Expand Down
7 changes: 5 additions & 2 deletions packages/transport-webrtc/src/private-to-private/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ export const readCandidatesUntilConnected = async (pc: RTCPeerConnection, stream
connectedPromise.promise,
stream.read({
signal: options.signal
})
}).catch(() => {})
])

// stream ended or we became connected
if (message == null) {
// throw if we timed out
options.signal?.throwIfAborted()

break
}

Expand All @@ -48,7 +51,7 @@ export const readCandidatesUntilConnected = async (pc: RTCPeerConnection, stream

const candidate = new RTCIceCandidate(candidateInit)

options.log.trace('%s received new ICE candidate', options.direction, candidate)
options.log.trace('%s received new ICE candidate %o', options.direction, candidateInit)

try {
await pc.addIceCandidate(candidate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ export class WebRTCDirectTransport implements Transport {
const ufrag = UFRAG_PREFIX + genUfrag(32)

// https://github.com/libp2p/specs/blob/master/webrtc/webrtc-direct.md#browser-to-public-server
const peerConnection = await createDialerRTCPeerConnection('NodeA', ufrag, this.init.rtcConfiguration)
const peerConnection = await createDialerRTCPeerConnection('NodeA', ufrag, typeof this.init.rtcConfiguration === 'function' ? await this.init.rtcConfiguration() : this.init.rtcConfiguration ?? {})

try {
return await raceSignal(connect(peerConnection, ufrag, ufrag, {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ export class DirectRTCPeerConnection extends RTCPeerConnection {

export async function createDialerRTCPeerConnection (name: string, ufrag: string, rtcConfiguration?: RTCConfiguration | (() => RTCConfiguration | Promise<RTCConfiguration>), certificate?: TransportCertificate): Promise<DirectRTCPeerConnection> {
if (certificate == null) {
// ECDSA is preferred over RSA here. From our testing we find that P-256
// elliptic curve is supported by Pion, webrtc-rs, as well as Chromium
// (P-228 and P-384 was not supported in Chromium). We use the same hash
// function as found in the multiaddr if it is supported.
const keyPair = await crypto.subtle.generateKey({
name: 'ECDSA',
namedCurve: 'P-256'
Expand Down
42 changes: 19 additions & 23 deletions packages/transport-webrtc/test/peer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@ import { Message } from '../src/private-to-private/pb/message.js'
import { handleIncomingStream } from '../src/private-to-private/signaling-stream-handler.js'
import { SIGNALING_PROTO_ID, WebRTCTransport, splitAddr } from '../src/private-to-private/transport.js'
import { RTCPeerConnection, RTCSessionDescription } from '../src/webrtc/index.js'
import type { Logger, Connection, Stream } from '@libp2p/interface'
import type { Logger, Connection, Stream, ComponentLogger } from '@libp2p/interface'
import type { ConnectionManager, TransportManager } from '@libp2p/interface-internal'

const browser = detect()

interface Initiator {
multiaddr: Multiaddr
peerConnection: RTCPeerConnection
connectionManager: StubbedInstance<ConnectionManager>
transportManager: StubbedInstance<TransportManager>
connection: StubbedInstance<Connection>
stream: Stream
log: Logger
logger: ComponentLogger
}

interface Recipient {
Expand Down Expand Up @@ -67,12 +67,12 @@ async function getComponents (): Promise<PrivateToPrivateComponents> {
return {
initiator: {
multiaddr: receiverMultiaddr,
peerConnection: new RTCPeerConnection(),
connectionManager: stubInterface<ConnectionManager>(),
transportManager: stubInterface<TransportManager>(),
connection: stubInterface<Connection>(),
stream: initiatorStream,
log: logger('test')
log: logger('test'),
logger: defaultLogger()
},
recipient: {
peerConnection: new RTCPeerConnection(),
Expand All @@ -91,9 +91,10 @@ describe('webrtc basic', () => {
const isFirefox = ((browser != null) && browser.name === 'firefox')
let initiator: Initiator
let recipient: Recipient
let initiatorPeerConnection: RTCPeerConnection

afterEach(() => {
initiator?.peerConnection?.close()
initiatorPeerConnection?.close()
recipient?.peerConnection?.close()
})

Expand All @@ -109,7 +110,7 @@ describe('webrtc basic', () => {
// signalling stream opens successfully
initiator.connection.newStream.withArgs(SIGNALING_PROTO_ID).resolves(initiator.stream)

await expect(
;[{ peerConnection: initiatorPeerConnection }] = await expect(
Promise.all([
initiateConnection(initiator),
handleIncomingStream(recipient)
Expand All @@ -118,11 +119,11 @@ describe('webrtc basic', () => {

await pRetry(async () => {
if (isFirefox) {
expect(initiator.peerConnection.iceConnectionState).eq('connected')
expect(initiatorPeerConnection.iceConnectionState).eq('connected')
expect(recipient.peerConnection.iceConnectionState).eq('connected')
return
}
expect(initiator.peerConnection.connectionState).eq('connected')
expect(initiatorPeerConnection.connectionState).eq('connected')
expect(recipient.peerConnection.connectionState).eq('connected')
})
})
Expand All @@ -137,18 +138,14 @@ describe('webrtc basic', () => {
// transport manager dials recipient
initiator.transportManager.dial.resolves(initiator.connection)

const createOffer = initiator.peerConnection.setRemoteDescription.bind(initiator.peerConnection)

initiator.peerConnection.setRemoteDescription = async (name) => {
// the dial is aborted
initiator.connection.newStream.callsFake(async () => {
// the operation is aborted
abortController.abort(new Error('Oh noes!'))
// setting the description takes some time
// opening the stream takes some time
await delay(100)
return createOffer(name)
}

// signalling stream opens successfully
initiator.connection.newStream.withArgs(SIGNALING_PROTO_ID).resolves(initiator.stream)
// signalling stream opens successfully
return initiator.stream
})

await expect(Promise.all([
initiateConnection({
Expand All @@ -164,9 +161,10 @@ describe('webrtc basic', () => {
describe('webrtc receiver', () => {
let initiator: Initiator
let recipient: Recipient
let initiatorPeerConnection: RTCPeerConnection

afterEach(() => {
initiator?.peerConnection?.close()
initiatorPeerConnection?.close()
recipient?.peerConnection?.close()
})

Expand All @@ -177,18 +175,16 @@ describe('webrtc receiver', () => {

await stream.write({ type: Message.Type.SDP_OFFER, data: 'bad' })
await expect(receiverPeerConnectionPromise).to.be.rejectedWith(/Failed to set remoteDescription/)

initiator.peerConnection.close()
recipient.peerConnection.close()
})
})

describe('webrtc dialer', () => {
let initiator: Initiator
let recipient: Recipient
let initiatorPeerConnection: RTCPeerConnection

afterEach(() => {
initiator?.peerConnection?.close()
initiatorPeerConnection?.close()
recipient?.peerConnection?.close()
})

Expand Down
2 changes: 1 addition & 1 deletion packages/transport-webrtc/test/stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ describe('Max message size', () => {
await expect(webrtcStream.sink([new Uint8Array(1)])).to.eventually.be.rejected
.with.property('code', 'ERR_BUFFER_CLEAR_TIMEOUT')
const t1 = Date.now()
expect(t1 - t0).greaterThan(timeout)
expect(t1 - t0).greaterThanOrEqual(timeout)
expect(t1 - t0).lessThan(timeout + 1000) // Some upper bound
await closed.promise
expect(webrtcStream.timeline.close).to.be.greaterThan(webrtcStream.timeline.open)
Expand Down
2 changes: 1 addition & 1 deletion packages/utils/src/queue/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export class Job <JobOptions extends AbortOptions = AbortOptions, JobReturnType
}

async join (options: AbortOptions = {}): Promise<JobReturnType> {
const recipient = new JobRecipient<JobReturnType>((new Error('where')).stack, options.signal)
const recipient = new JobRecipient<JobReturnType>(options.signal)
this.recipients.push(recipient)

options.signal?.addEventListener('abort', this.onAbort)
Expand Down
4 changes: 1 addition & 3 deletions packages/utils/src/queue/recipient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@ import type { DeferredPromise } from 'p-defer'
export class JobRecipient<JobReturnType> {
public deferred: DeferredPromise<JobReturnType>
public signal?: AbortSignal
public where?: string

constructor (where?: string, signal?: AbortSignal) {
constructor (signal?: AbortSignal) {
this.signal = signal
this.deferred = pDefer()
this.where = where

this.onAbort = this.onAbort.bind(this)
this.signal?.addEventListener('abort', this.onAbort)
Expand Down

0 comments on commit 71bf4cb

Please sign in to comment.