From 0b72656435f789ffe2b6ba2b0dc2c7fe71feb96b Mon Sep 17 00:00:00 2001 From: achingbrain Date: Mon, 4 Nov 2024 22:29:08 +0000 Subject: [PATCH] feat: add memory transport Adds an in-memory only transport using the `/memory/...` multiaddr protocol. This lets us connect multiple in-process nodes together with controlable latency settings to aid testing. --- .release-please-manifest.json | 2 +- .release-please.json | 1 + packages/transport-memory/.aegir.js | 6 + packages/transport-memory/LICENSE | 4 + packages/transport-memory/LICENSE-APACHE | 5 + packages/transport-memory/LICENSE-MIT | 19 ++ packages/transport-memory/README.md | 86 +++++++++ packages/transport-memory/package.json | 75 ++++++++ packages/transport-memory/src/connections.ts | 176 ++++++++++++++++++ packages/transport-memory/src/index.ts | 62 ++++++ packages/transport-memory/src/listener.ts | 135 ++++++++++++++ packages/transport-memory/src/memory.ts | 113 +++++++++++ .../transport-memory/test/compliance.spec.ts | 69 +++++++ packages/transport-memory/tsconfig.json | 27 +++ packages/transport-memory/typedoc.json | 5 + 15 files changed, 784 insertions(+), 1 deletion(-) create mode 100644 packages/transport-memory/.aegir.js create mode 100644 packages/transport-memory/LICENSE create mode 100644 packages/transport-memory/LICENSE-APACHE create mode 100644 packages/transport-memory/LICENSE-MIT create mode 100644 packages/transport-memory/README.md create mode 100644 packages/transport-memory/package.json create mode 100644 packages/transport-memory/src/connections.ts create mode 100644 packages/transport-memory/src/index.ts create mode 100644 packages/transport-memory/src/listener.ts create mode 100644 packages/transport-memory/src/memory.ts create mode 100644 packages/transport-memory/test/compliance.spec.ts create mode 100644 packages/transport-memory/tsconfig.json create mode 100644 packages/transport-memory/typedoc.json diff --git a/.release-please-manifest.json b/.release-please-manifest.json index c16fe6e36f..fec9fa0774 100644 --- a/.release-please-manifest.json +++ b/.release-please-manifest.json @@ -1 +1 @@ -{"packages/connection-encrypter-plaintext":"2.0.10","packages/connection-encrypter-tls":"2.0.10","packages/crypto":"5.0.6","packages/interface":"2.2.0","packages/interface-compliance-tests":"6.1.8","packages/interface-internal":"2.0.10","packages/kad-dht":"14.1.0","packages/keychain":"5.0.9","packages/libp2p":"2.2.1","packages/logger":"5.1.3","packages/metrics-devtools":"1.1.8","packages/metrics-prometheus":"4.2.4","packages/metrics-simple":"1.2.6","packages/multistream-select":"6.0.8","packages/peer-collections":"6.0.10","packages/peer-discovery-bootstrap":"11.0.10","packages/peer-discovery-mdns":"11.0.10","packages/peer-id":"5.0.7","packages/peer-record":"8.0.10","packages/peer-store":"11.0.10","packages/pnet":"2.0.10","packages/protocol-autonat":"2.0.10","packages/protocol-dcutr":"2.0.10","packages/protocol-echo":"2.1.1","packages/protocol-fetch":"2.0.10","packages/protocol-identify":"3.0.10","packages/protocol-perf":"4.0.10","packages/protocol-ping":"2.0.10","packages/pubsub":"10.0.10","packages/pubsub-floodsub":"10.1.8","packages/record":"4.0.4","packages/stream-multiplexer-mplex":"11.0.10","packages/transport-circuit-relay-v2":"3.1.0","packages/transport-tcp":"10.0.11","packages/transport-webrtc":"5.0.16","packages/transport-websockets":"9.0.11","packages/transport-webtransport":"5.0.16","packages/upnp-nat":"2.0.10","packages/utils":"6.1.3"} +{"packages/connection-encrypter-plaintext":"2.0.10","packages/connection-encrypter-tls":"2.0.10","packages/crypto":"5.0.6","packages/interface":"2.2.0","packages/interface-compliance-tests":"6.1.8","packages/interface-internal":"2.0.10","packages/kad-dht":"14.1.0","packages/keychain":"5.0.9","packages/libp2p":"2.2.1","packages/logger":"5.1.3","packages/metrics-devtools":"1.1.8","packages/metrics-prometheus":"4.2.4","packages/metrics-simple":"1.2.6","packages/multistream-select":"6.0.8","packages/peer-collections":"6.0.10","packages/peer-discovery-bootstrap":"11.0.10","packages/peer-discovery-mdns":"11.0.10","packages/peer-id":"5.0.7","packages/peer-record":"8.0.10","packages/peer-store":"11.0.10","packages/pnet":"2.0.10","packages/protocol-autonat":"2.0.10","packages/protocol-dcutr":"2.0.10","packages/protocol-echo":"2.1.1","packages/protocol-fetch":"2.0.10","packages/protocol-identify":"3.0.10","packages/protocol-perf":"4.0.10","packages/protocol-ping":"2.0.10","packages/pubsub":"10.0.10","packages/pubsub-floodsub":"10.1.8","packages/record":"4.0.4","packages/stream-multiplexer-mplex":"11.0.10","packages/transport-circuit-relay-v2":"3.1.0","packages/transport-memory":"0.0.0","packages/transport-tcp":"10.0.11","packages/transport-webrtc":"5.0.16","packages/transport-websockets":"9.0.11","packages/transport-webtransport":"5.0.16","packages/upnp-nat":"2.0.10","packages/utils":"6.1.3"} diff --git a/.release-please.json b/.release-please.json index 07baf97322..6fb8596cd7 100644 --- a/.release-please.json +++ b/.release-please.json @@ -42,6 +42,7 @@ "packages/record": {}, "packages/stream-multiplexer-mplex": {}, "packages/transport-circuit-relay-v2": {}, + "packages/transport-memory": {}, "packages/transport-tcp": {}, "packages/transport-webrtc": {}, "packages/transport-websockets": {}, diff --git a/packages/transport-memory/.aegir.js b/packages/transport-memory/.aegir.js new file mode 100644 index 0000000000..13d33c8d5c --- /dev/null +++ b/packages/transport-memory/.aegir.js @@ -0,0 +1,6 @@ +/** @type {import('aegir').PartialOptions} */ +export default { + build: { + bundlesizeMax: '15KB' + } +} diff --git a/packages/transport-memory/LICENSE b/packages/transport-memory/LICENSE new file mode 100644 index 0000000000..20ce483c86 --- /dev/null +++ b/packages/transport-memory/LICENSE @@ -0,0 +1,4 @@ +This project is dual licensed under MIT and Apache-2.0. + +MIT: https://www.opensource.org/licenses/mit +Apache-2.0: https://www.apache.org/licenses/license-2.0 diff --git a/packages/transport-memory/LICENSE-APACHE b/packages/transport-memory/LICENSE-APACHE new file mode 100644 index 0000000000..14478a3b60 --- /dev/null +++ b/packages/transport-memory/LICENSE-APACHE @@ -0,0 +1,5 @@ +Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. diff --git a/packages/transport-memory/LICENSE-MIT b/packages/transport-memory/LICENSE-MIT new file mode 100644 index 0000000000..72dc60d84b --- /dev/null +++ b/packages/transport-memory/LICENSE-MIT @@ -0,0 +1,19 @@ +The MIT License (MIT) + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/packages/transport-memory/README.md b/packages/transport-memory/README.md new file mode 100644 index 0000000000..ce955ad2a1 --- /dev/null +++ b/packages/transport-memory/README.md @@ -0,0 +1,86 @@ +# @libp2p/tcp + +[![libp2p.io](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/) +[![Discuss](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg?style=flat-square)](https://discuss.libp2p.io) +[![codecov](https://img.shields.io/codecov/c/github/libp2p/js-libp2p.svg?style=flat-square)](https://codecov.io/gh/libp2p/js-libp2p) +[![CI](https://img.shields.io/github/actions/workflow/status/libp2p/js-libp2p/main.yml?branch=main\&style=flat-square)](https://github.com/libp2p/js-libp2p/actions/workflows/main.yml?query=branch%3Amain) + +> A memory transport for libp2p + +# About + + + +A [libp2p transport](https://docs.libp2p.io/concepts/transports/overview/) +that operates in-memory only. + +This is intended for testing and can only be used to connect two libp2p nodes +that are running in the same process. + +## Example + +```TypeScript +import { createLibp2p } from 'libp2p' +import { memory } from '@libp2p/memory' +import { multiaddr } from '@multiformats/multiaddr' + +const listener = await createLibp2p({ + addresses: { + listen: [ + '/memory/address-a' + ] + }, + transports: [ + memory() + ] +}) + +const dialer = await createLibp2p({ + transports: [ + memory() + ] +}) + +const ma = multiaddr('/memory/address-a') + +// dial the listener, timing out after 10s +const connection = await dialer.dial(ma, { + signal: AbortSignal.timeout(10_000) +}) + +// use connection... +``` + +# Install + +```console +$ npm i @libp2p/tcp +``` + +# API Docs + +- + +# License + +Licensed under either of + +- Apache 2.0, ([LICENSE-APACHE](https://github.com/libp2p/js-libp2p/blob/main/packages/transport-tcp/LICENSE-APACHE) / ) +- MIT ([LICENSE-MIT](https://github.com/libp2p/js-libp2p/blob/main/packages/transport-tcp/LICENSE-MIT) / ) + +# Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions. diff --git a/packages/transport-memory/package.json b/packages/transport-memory/package.json new file mode 100644 index 0000000000..0269be5766 --- /dev/null +++ b/packages/transport-memory/package.json @@ -0,0 +1,75 @@ +{ + "name": "@libp2p/memory", + "version": "0.0.0", + "description": "A memory transport for libp2p", + "license": "Apache-2.0 OR MIT", + "homepage": "https://github.com/libp2p/js-libp2p/tree/main/packages/transport-tcp#readme", + "repository": { + "type": "git", + "url": "git+https://github.com/libp2p/js-libp2p.git" + }, + "bugs": { + "url": "https://github.com/libp2p/js-libp2p/issues" + }, + "publishConfig": { + "access": "public", + "provenance": true + }, + "type": "module", + "types": "./dist/src/index.d.ts", + "files": [ + "src", + "dist", + "!dist/test", + "!**/*.tsbuildinfo" + ], + "exports": { + ".": { + "types": "./dist/src/index.d.ts", + "import": "./dist/src/index.js" + } + }, + "eslintConfig": { + "extends": "ipfs", + "parserOptions": { + "project": true, + "sourceType": "module" + } + }, + "scripts": { + "clean": "aegir clean", + "lint": "aegir lint", + "dep-check": "aegir dep-check", + "doc-check": "aegir doc-check", + "build": "aegir build", + "test": "aegir test -t node -t electron-main", + "test:chrome": "aegir test -t browser -f ./dist/test/browser.js --cov", + "test:chrome-webworker": "aegir test -t webworker -f ./dist/test/browser.js", + "test:firefox": "aegir test -t browser -f ./dist/test/browser.js -- --browser firefox", + "test:firefox-webworker": "aegir test -t webworker -f ./dist/test/browser.js -- --browser firefox", + "test:node": "aegir test -t node --cov", + "test:electron-main": "aegir test -t electron-main" + }, + "dependencies": { + "@libp2p/interface": "^2.2.0", + "@multiformats/multiaddr": "^12.2.3", + "@multiformats/multiaddr-matcher": "^1.5.0", + "@types/sinon": "^17.0.3", + "delay": "^6.0.0", + "it-map": "^3.1.1", + "it-pushable": "^3.2.3", + "nanoid": "^5.0.8", + "uint8arraylist": "^2.4.8" + }, + "devDependencies": { + "@libp2p/crypto": "^5.0.6", + "@libp2p/interface-compliance-tests": "^6.1.8", + "@libp2p/logger": "^5.1.3", + "@libp2p/peer-id": "^5.0.7", + "aegir": "^44.0.1" + }, + "browser": { + "./dist/src/tcp.js": "./dist/src/tcp.browser.js" + }, + "sideEffects": false +} diff --git a/packages/transport-memory/src/connections.ts b/packages/transport-memory/src/connections.ts new file mode 100644 index 0000000000..a28ace40a8 --- /dev/null +++ b/packages/transport-memory/src/connections.ts @@ -0,0 +1,176 @@ +/** + * @packageDocumentation + * + * A [libp2p transport](https://docs.libp2p.io/concepts/transports/overview/) + * that operates in-memory only. + * + * This is intended for testing and can only be used to connect two libp2p nodes + * that are running in the same process. + * + * @example + * + * ```TypeScript + * import { createLibp2p } from 'libp2p' + * import { memory } from '@libp2p/memory' + * import { multiaddr } from '@multiformats/multiaddr' + * + * const listener = await createLibp2p({ + * addresses: { + * listen: [ + * '/memory/node-a' + * ] + * }, + * transports: [ + * memory() + * ] + * }) + * + * const dialer = await createLibp2p({ + * transports: [ + * memory() + * ] + * }) + * + * const ma = multiaddr('/memory/node-a') + * + * // dial the listener, timing out after 10s + * const connection = await dialer.dial(ma, { + * signal: AbortSignal.timeout(10_000) + * }) + * + * // use connection... + * ``` + */ + +import { ConnectionFailedError } from '@libp2p/interface' +import { multiaddr } from '@multiformats/multiaddr' +import delay from 'delay' +import map from 'it-map' +import { pushable } from 'it-pushable' +import type { MemoryTransportComponents } from './index.js' +import type { MultiaddrConnection, PeerId } from '@libp2p/interface' +import type { Uint8ArrayList } from 'uint8arraylist' + +export const connections = new Map() + +interface MemoryConnectionHandler { + (maConn: MultiaddrConnection): void +} + +interface MemoryConnectionInit { + onConnection: MemoryConnectionHandler + address: string +} + +export class MemoryConnection { + private readonly components: MemoryTransportComponents + private readonly init: MemoryConnectionInit + private readonly connections: Set + private latency: number + + constructor (components: MemoryTransportComponents, init: MemoryConnectionInit) { + this.components = components + this.init = init + this.connections = new Set() + this.latency = 0 + } + + async dial (dialingPeerId: PeerId): Promise { + const dialerPushable = pushable() + const listenerPushable = pushable() + const self = this + + const dialer: MultiaddrConnection = { + source: (async function * () { + yield * map(listenerPushable, async buf => { + await delay(self.latency) + return buf + }) + })(), + sink: async (source) => { + for await (const buf of source) { + dialerPushable.push(buf) + } + }, + close: async () => { + dialerPushable.end() + this.connections.delete(dialer) + dialer.timeline.close = Date.now() + + listenerPushable.end() + this.connections.delete(listener) + listener.timeline.close = Date.now() + }, + abort: (err) => { + dialerPushable.end(err) + this.connections.delete(dialer) + dialer.timeline.close = Date.now() + + listenerPushable.end(err) + this.connections.delete(listener) + listener.timeline.close = Date.now() + }, + timeline: { + open: Date.now() + }, + remoteAddr: multiaddr(`${this.init.address}/p2p/${this.components.peerId}`), + log: this.components.logger.forComponent(`libp2p:memory:outgoing:${1}`) + } + + const listener: MultiaddrConnection = { + source: (async function * () { + yield * map(dialerPushable, async buf => { + await delay(self.latency) + return buf + }) + })(), + sink: async (source) => { + for await (const buf of source) { + listenerPushable.push(buf) + } + }, + close: async () => { + listenerPushable.end() + this.connections.delete(listener) + listener.timeline.close = Date.now() + + dialerPushable.end() + this.connections.delete(dialer) + dialer.timeline.close = Date.now() + }, + abort: (err) => { + listenerPushable.end(err) + this.connections.delete(listener) + listener.timeline.close = Date.now() + + dialerPushable.end(err) + this.connections.delete(dialer) + dialer.timeline.close = Date.now() + }, + timeline: { + open: Date.now() + }, + remoteAddr: multiaddr(`${this.init.address}-outgoing/p2p/${dialingPeerId}`), + log: this.components.logger.forComponent(`libp2p:memory:outgoing:${1}`) + } + + this.connections.add(dialer) + this.connections.add(listener) + + await delay(this.latency) + + this.init.onConnection(listener) + + return dialer + } + + close (): void { + [...this.connections].forEach(maConn => { + maConn.abort(new ConnectionFailedError('Memory Connection closed')) + }) + } + + setLatency (ms: number): void { + this.latency = ms + } +} diff --git a/packages/transport-memory/src/index.ts b/packages/transport-memory/src/index.ts new file mode 100644 index 0000000000..cab3478b87 --- /dev/null +++ b/packages/transport-memory/src/index.ts @@ -0,0 +1,62 @@ +/** + * @packageDocumentation + * + * A [libp2p transport](https://docs.libp2p.io/concepts/transports/overview/) + * that operates in-memory only. + * + * This is intended for testing and can only be used to connect two libp2p nodes + * that are running in the same process. + * + * @example + * + * ```TypeScript + * import { createLibp2p } from 'libp2p' + * import { memory } from '@libp2p/memory' + * import { multiaddr } from '@multiformats/multiaddr' + * + * const listener = await createLibp2p({ + * addresses: { + * listen: [ + * '/memory/address-a' + * ] + * }, + * transports: [ + * memory() + * ] + * }) + * + * const dialer = await createLibp2p({ + * transports: [ + * memory() + * ] + * }) + * + * const ma = multiaddr('/memory/address-a') + * + * // dial the listener, timing out after 10s + * const connection = await dialer.dial(ma, { + * signal: AbortSignal.timeout(10_000) + * }) + * + * // use connection... + * ``` + */ + +import { MemoryTransport } from './memory.js' +import type { Transport, ComponentLogger, UpgraderOptions, PeerId } from '@libp2p/interface' + +export interface MemoryTransportComponents { + peerId: PeerId + logger: ComponentLogger +} + +export interface MemoryTransportInit { + upgraderOptions?: UpgraderOptions + inboundUpgradeTimeout?: number +} + +export function memory (init?: MemoryTransportInit): (components: MemoryTransportComponents) => Transport { + return (components) => { + return new MemoryTransport(components, init) + } +} diff --git a/packages/transport-memory/src/listener.ts b/packages/transport-memory/src/listener.ts new file mode 100644 index 0000000000..ffedea10dc --- /dev/null +++ b/packages/transport-memory/src/listener.ts @@ -0,0 +1,135 @@ +/** + * @packageDocumentation + * + * A [libp2p transport](https://docs.libp2p.io/concepts/transports/overview/) + * that operates in-memory only. + * + * This is intended for testing and can only be used to connect two libp2p nodes + * that are running in the same process. + * + * @example + * + * ```TypeScript + * import { createLibp2p } from 'libp2p' + * import { memory } from '@libp2p/memory' + * import { multiaddr } from '@multiformats/multiaddr' + * + * const listener = await createLibp2p({ + * addresses: { + * listen: [ + * '/memory/node-a' + * ] + * }, + * transports: [ + * memory() + * ] + * }) + * + * const dialer = await createLibp2p({ + * transports: [ + * memory() + * ] + * }) + * + * const ma = multiaddr('/memory/node-a') + * + * // dial the listener, timing out after 10s + * const connection = await dialer.dial(ma, { + * signal: AbortSignal.timeout(10_000) + * }) + * + * // use connection... + * ``` + */ + +import { ListenError, TypedEventEmitter } from '@libp2p/interface' +import { multiaddr } from '@multiformats/multiaddr' +import { nanoid } from 'nanoid' +import { MemoryConnection, connections } from './connections.js' +import type { MemoryTransportComponents, MemoryTransportInit } from './index.js' +import type { Listener, CreateListenerOptions, ListenerEvents, MultiaddrConnection, UpgraderOptions } from '@libp2p/interface' +import type { Multiaddr } from '@multiformats/multiaddr' + +export interface MemoryTransportListenerInit extends CreateListenerOptions, MemoryTransportInit { + upgraderOptions?: UpgraderOptions +} + +export class MemoryTransportListener extends TypedEventEmitter implements Listener { + private listenAddr?: Multiaddr + private connection?: MemoryConnection + private readonly components: MemoryTransportComponents + private readonly init: MemoryTransportListenerInit + + constructor (components: MemoryTransportComponents, init: MemoryTransportListenerInit) { + super() + + this.components = components + this.init = init + } + + async listen (ma: Multiaddr): Promise { + const [[, value]] = ma.stringTuples() + + const address = `/memory/${value ?? nanoid()}` + + if (value != null && connections.has(address)) { + throw new ListenError(`Memory address ${address} already in use`) + } + + this.connection = new MemoryConnection(this.components, { + onConnection: this.onConnection.bind(this), + address + }) + this.listenAddr = multiaddr(address) + + connections.set(address, this.connection) + + queueMicrotask(() => { + this.safeDispatchEvent('listening') + }) + } + + onConnection (maConn: MultiaddrConnection): void { + let signal: AbortSignal | undefined + + if (this.init.inboundUpgradeTimeout != null) { + signal = AbortSignal.timeout(this.init.inboundUpgradeTimeout) + } + + this.init.upgrader.upgradeInbound(maConn, { + ...this.init.upgraderOptions, + signal + }) + .then(connection => { + this.init.handler?.(connection) + this.safeDispatchEvent('connection', { + detail: connection + }) + }) + .catch(err => { + maConn.abort(err) + }) + } + + getAddrs (): Multiaddr[] { + if (this.listenAddr == null) { + return [] + } + + return [ + this.listenAddr + ] + } + + async close (): Promise { + this.connection?.close() + + if (this.listenAddr != null) { + connections.delete(this.listenAddr.toString()) + } + + queueMicrotask(() => { + this.safeDispatchEvent('close') + }) + } +} diff --git a/packages/transport-memory/src/memory.ts b/packages/transport-memory/src/memory.ts new file mode 100644 index 0000000000..315509cba5 --- /dev/null +++ b/packages/transport-memory/src/memory.ts @@ -0,0 +1,113 @@ +/** + * @packageDocumentation + * + * A [libp2p transport](https://docs.libp2p.io/concepts/transports/overview/) + * that operates in-memory only. + * + * This is intended for testing and can only be used to connect two libp2p nodes + * that are running in the same process. + * + * @example + * + * ```TypeScript + * import { createLibp2p } from 'libp2p' + * import { memory } from '@libp2p/memory' + * import { multiaddr } from '@multiformats/multiaddr' + * + * const listener = await createLibp2p({ + * addresses: { + * listen: [ + * '/memory/node-a' + * ] + * }, + * transports: [ + * memory() + * ] + * }) + * + * const dialer = await createLibp2p({ + * transports: [ + * memory() + * ] + * }) + * + * const ma = multiaddr('/memory/node-a') + * + * // dial the listener, timing out after 10s + * const connection = await dialer.dial(ma, { + * signal: AbortSignal.timeout(10_000) + * }) + * + * // use connection... + * ``` + */ + +import { ConnectionFailedError, serviceCapabilities, transportSymbol } from '@libp2p/interface' +import { Memory } from '@multiformats/multiaddr-matcher' +import { connections } from './connections.js' +import { MemoryTransportListener } from './listener.js' +import type { MemoryTransportComponents, MemoryTransportInit } from './index.js' +import type { Connection, Transport, Listener, CreateListenerOptions, DialTransportOptions } from '@libp2p/interface' +import type { Multiaddr } from '@multiformats/multiaddr' + +export class MemoryTransport implements Transport { + private readonly components: MemoryTransportComponents + private readonly init: MemoryTransportInit + + constructor (components: MemoryTransportComponents, init: MemoryTransportInit = {}) { + this.components = components + this.init = init + } + + readonly [transportSymbol] = true + + readonly [Symbol.toStringTag] = '@libp2p/memory' + + readonly [serviceCapabilities]: string[] = [ + '@libp2p/transport' + ] + + async dial (ma: Multiaddr, options: DialTransportOptions): Promise { + options.signal?.throwIfAborted() + + const memoryConnection = connections.get(`${ma.getPeerId() == null ? ma : ma.decapsulate('/p2p')}`) + + if (memoryConnection == null) { + throw new ConnectionFailedError(`No memory listener found at ${ma}`) + } + + const maConn = await memoryConnection.dial(this.components.peerId) + + try { + options.signal?.throwIfAborted() + + return await options.upgrader.upgradeOutbound(maConn, { + ...options, + ...this.init.upgraderOptions + }) + } catch (err: any) { + maConn.abort(err) + throw err + } + } + + /** + * Creates a TCP listener. The provided `handler` function will be called + * anytime a new incoming Connection has been successfully upgraded via + * `upgrader.upgradeInbound`. + */ + createListener (options: CreateListenerOptions): Listener { + return new MemoryTransportListener(this.components, { + ...options, + ...this.init + }) + } + + listenFilter (multiaddrs: Multiaddr[]): Multiaddr[] { + return multiaddrs.filter(ma => Memory.exactMatch(ma)) + } + + dialFilter (multiaddrs: Multiaddr[]): Multiaddr[] { + return this.listenFilter(multiaddrs) + } +} diff --git a/packages/transport-memory/test/compliance.spec.ts b/packages/transport-memory/test/compliance.spec.ts new file mode 100644 index 0000000000..6620843256 --- /dev/null +++ b/packages/transport-memory/test/compliance.spec.ts @@ -0,0 +1,69 @@ +import { generateKeyPair } from '@libp2p/crypto/keys' +import tests from '@libp2p/interface-compliance-tests/transport' +import { defaultLogger } from '@libp2p/logger' +import { peerIdFromPrivateKey } from '@libp2p/peer-id' +import { multiaddr } from '@multiformats/multiaddr' +import { connections } from '../src/connections.js' +import { memory } from '../src/index.js' +import type { MemoryTransportListener } from '../src/listener.js' +import type { Listener } from '@libp2p/interface' + +describe('transport compliance tests', () => { + tests({ + async setup () { + const privateKey = await generateKeyPair('Ed25519') + + const transport = memory()({ + logger: defaultLogger(), + peerId: peerIdFromPrivateKey(privateKey) + }) + const addrs = [ + multiaddr('/memory/addr-1'), + multiaddr('/memory/addr-2'), + multiaddr('/memory/addr-3'), + multiaddr('/memory/addr-4') + ] + + let delayMs = 0 + const delayedCreateListener = (options: any): Listener => { + const listener = transport.createListener(options) as unknown as MemoryTransportListener + + const onConnection = listener.onConnection.bind(listener) + + listener.onConnection = (maConn: any) => { + setTimeout(() => { + onConnection(maConn) + }, delayMs) + } + + return listener + } + + const transportProxy = new Proxy(transport, { + // @ts-expect-error cannot access props with a string + get: (_, prop) => prop === 'createListener' ? delayedCreateListener : transport[prop] + }) + + // Used by the dial tests to simulate a delayed connect + const connector = { + delay (ms: number) { + delayMs = ms + connections.get('/memory/addr-1')?.setLatency(ms) + connections.get('/memory/addr-2')?.setLatency(ms) + connections.get('/memory/addr-3')?.setLatency(ms) + connections.get('/memory/addr-4')?.setLatency(ms) + }, + restore () { + delayMs = 0 + connections.get('/memory/addr-1')?.setLatency(0) + connections.get('/memory/addr-2')?.setLatency(0) + connections.get('/memory/addr-3')?.setLatency(0) + connections.get('/memory/addr-4')?.setLatency(0) + } + } + + return { dialer: transportProxy, listener: transportProxy, listenAddrs: addrs, dialAddrs: addrs, connector } + }, + async teardown () {} + }) +}) diff --git a/packages/transport-memory/tsconfig.json b/packages/transport-memory/tsconfig.json new file mode 100644 index 0000000000..85ad88030d --- /dev/null +++ b/packages/transport-memory/tsconfig.json @@ -0,0 +1,27 @@ +{ + "extends": "aegir/src/config/tsconfig.aegir.json", + "compilerOptions": { + "outDir": "dist" + }, + "include": [ + "src", + "test" + ], + "references": [ + { + "path": "../crypto" + }, + { + "path": "../interface" + }, + { + "path": "../interface-compliance-tests" + }, + { + "path": "../logger" + }, + { + "path": "../peer-id" + } + ] +} diff --git a/packages/transport-memory/typedoc.json b/packages/transport-memory/typedoc.json new file mode 100644 index 0000000000..f599dc728d --- /dev/null +++ b/packages/transport-memory/typedoc.json @@ -0,0 +1,5 @@ +{ + "entryPoints": [ + "./src/index.ts" + ] +}