From e566d76fccbf9b1b4d461a747429941643c3de3b Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 14 Nov 2024 14:09:59 +0000 Subject: [PATCH] chore: check connection streams --- .../src/transport/index.ts | 10 +++++----- packages/transport-webrtc/src/muxer.ts | 13 ++++++++++--- packages/transport-webrtc/src/stream.ts | 8 ++++++-- 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/packages/interface-compliance-tests/src/transport/index.ts b/packages/interface-compliance-tests/src/transport/index.ts index 1e2f840b98..54ecdef632 100644 --- a/packages/interface-compliance-tests/src/transport/index.ts +++ b/packages/interface-compliance-tests/src/transport/index.ts @@ -140,7 +140,7 @@ export default (common: TestSetup): void => { .with.property('name', 'AbortError') }) - it('should close all streams when the connection closes', async () => { + it.only('should close all streams when the connection closes', async () => { ({ dialer, listener, dialAddrs } = await getSetup(common)) let incomingConnectionPromise: DeferredPromise | undefined @@ -164,14 +164,14 @@ export default (common: TestSetup): void => { remoteConn = await incomingConnectionPromise.promise } - const streams: Stream[] = [] - for (let i = 0; i < 5; i++) { - streams.push(await connection.newStream('/echo/1.0.0', { + await connection.newStream('/echo/1.0.0', { maxOutboundStreams: 5 - })) + }) } + const streams = connection.streams + // Close the connection and verify all streams have been closed await connection.close() diff --git a/packages/transport-webrtc/src/muxer.ts b/packages/transport-webrtc/src/muxer.ts index 63dccd16ff..cf6a5b7588 100644 --- a/packages/transport-webrtc/src/muxer.ts +++ b/packages/transport-webrtc/src/muxer.ts @@ -155,12 +155,16 @@ export class DataChannelMuxer implements StreamMuxer { return } + // lib-datachannel throws if `.getId` is called on a closed channel so + // memoize it + const id = channel.id + const stream = createStream({ channel, direction: 'inbound', onEnd: () => { - this.log('incoming channel %s ended with state %s', channel.id, channel.readyState) this.#onStreamEnd(stream, channel) + this.log('incoming channel %s ended', id) }, logger: this.logger, ...this.dataChannelOptions @@ -241,15 +245,18 @@ export class DataChannelMuxer implements StreamMuxer { newStream (): Stream { // The spec says the label SHOULD be an empty string: https://github.com/libp2p/specs/blob/master/webrtc/README.md#rtcdatachannel-label const channel = this.peerConnection.createDataChannel('') + // lib-datachannel throws if `.getId` is called on a closed channel so + // memoize it + const id = channel.id - this.log.trace('opened outgoing datachannel with channel id %s', channel.id) + this.log.trace('opened outgoing datachannel with channel id %s', id) const stream = createStream({ channel, direction: 'outbound', onEnd: () => { - this.log('outgoing channel %s ended with state %s', channel.id, channel.readyState) this.#onStreamEnd(stream, channel) + this.log('outgoing channel %s ended', id) }, logger: this.logger, ...this.dataChannelOptions diff --git a/packages/transport-webrtc/src/stream.ts b/packages/transport-webrtc/src/stream.ts index 694b9ba7ac..b345414c44 100644 --- a/packages/transport-webrtc/src/stream.ts +++ b/packages/transport-webrtc/src/stream.ts @@ -274,7 +274,11 @@ export class WebRTCStream extends AbstractStream { } async sendReset (): Promise { - await this._sendFlag(Message.Flag.RESET) + try { + await this._sendFlag(Message.Flag.RESET) + } catch (err) { + this.log.error('failed to send reset - %e', err) + } } async sendCloseWrite (options: AbortOptions): Promise { @@ -362,7 +366,7 @@ export class WebRTCStream extends AbstractStream { return true } catch (err: any) { - this.log.error('could not send flag %s', flag.toString(), err) + this.log.error('could not send flag %s - %e', flag.toString(), err) } return false