Skip to content

Commit

Permalink
feat: add latency option to memory transport (#2810)
Browse files Browse the repository at this point in the history
Allows slowing all network traffic by the passed amount of ms.
  • Loading branch information
achingbrain authored Nov 7, 2024
1 parent 5feba91 commit 050b01f
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 124 deletions.
5 changes: 2 additions & 3 deletions packages/transport-memory/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,10 @@
"uint8arraylist": "^2.4.8"
},
"devDependencies": {
"@libp2p/crypto": "^5.0.6",
"@libp2p/interface-compliance-tests": "^6.1.8",
"@libp2p/logger": "^5.1.3",
"@libp2p/peer-id": "^5.0.7",
"aegir": "^44.0.1"
"aegir": "^44.0.1",
"sinon-ts": "^2.0.0"
},
"browser": {
"./dist/src/tcp.js": "./dist/src/tcp.browser.js"
Expand Down
12 changes: 4 additions & 8 deletions packages/transport-memory/src/connections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import { multiaddr } from '@multiformats/multiaddr'
import delay from 'delay'
import map from 'it-map'
import { pushable } from 'it-pushable'
import type { MemoryTransportComponents } from './index.js'
import type { MemoryTransportComponents, MemoryTransportInit } from './index.js'
import type { MultiaddrConnection, PeerId } from '@libp2p/interface'
import type { Uint8ArrayList } from 'uint8arraylist'

Expand All @@ -57,7 +57,7 @@ interface MemoryConnectionHandler {
(maConn: MultiaddrConnection): void
}

interface MemoryConnectionInit {
interface MemoryConnectionInit extends MemoryTransportInit {
onConnection: MemoryConnectionHandler
address: string
}
Expand All @@ -66,13 +66,13 @@ export class MemoryConnection {
private readonly components: MemoryTransportComponents
private readonly init: MemoryConnectionInit
private readonly connections: Set<MultiaddrConnection>
private latency: number
private readonly latency: number

constructor (components: MemoryTransportComponents, init: MemoryConnectionInit) {
this.components = components
this.init = init
this.connections = new Set()
this.latency = 0
this.latency = init.latency ?? 0
}

async dial (dialingPeerId: PeerId): Promise<MultiaddrConnection> {
Expand Down Expand Up @@ -169,8 +169,4 @@ export class MemoryConnection {
maConn.abort(new ConnectionFailedError('Memory Connection closed'))
})
}

setLatency (ms: number): void {
this.latency = ms
}
}
7 changes: 7 additions & 0 deletions packages/transport-memory/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ export interface MemoryTransportComponents {
export interface MemoryTransportInit {
upgraderOptions?: UpgraderOptions
inboundUpgradeTimeout?: number

/**
* Add this much latency in ms to every buffer sent over the transport
*
* @default 0
*/
latency?: number
}

export function memory (init?: MemoryTransportInit): (components: MemoryTransportComponents) => Transport {
Expand Down
19 changes: 19 additions & 0 deletions packages/transport-memory/src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,24 @@
*
* // use connection...
* ```
*
* @example Simulating slow connections
*
* A `latency` argument can be passed to the factory. Each byte array that
* passes through the transport will be delayed by this many ms.
*
* ```TypeScript
* import { createLibp2p } from 'libp2p'
* import { memory } from '@libp2p/memory'
*
* const dialer = await createLibp2p({
* transports: [
* memory({
* latency: 100
* })
* ]
* })
* ```
*/

import { ListenError, TypedEventEmitter } from '@libp2p/interface'
Expand Down Expand Up @@ -77,6 +95,7 @@ export class MemoryTransportListener extends TypedEventEmitter<ListenerEvents> i
}

this.connection = new MemoryConnection(this.components, {
...this.init,
onConnection: this.onConnection.bind(this),
address
})
Expand Down
44 changes: 0 additions & 44 deletions packages/transport-memory/src/memory.ts
Original file line number Diff line number Diff line change
@@ -1,47 +1,3 @@
/**
* @packageDocumentation
*
* A [libp2p transport](https://docs.libp2p.io/concepts/transports/overview/)
* that operates in-memory only.
*
* This is intended for testing and can only be used to connect two libp2p nodes
* that are running in the same process.
*
* @example
*
* ```TypeScript
* import { createLibp2p } from 'libp2p'
* import { memory } from '@libp2p/memory'
* import { multiaddr } from '@multiformats/multiaddr'
*
* const listener = await createLibp2p({
* addresses: {
* listen: [
* '/memory/node-a'
* ]
* },
* transports: [
* memory()
* ]
* })
*
* const dialer = await createLibp2p({
* transports: [
* memory()
* ]
* })
*
* const ma = multiaddr('/memory/node-a')
*
* // dial the listener, timing out after 10s
* const connection = await dialer.dial(ma, {
* signal: AbortSignal.timeout(10_000)
* })
*
* // use connection...
* ```
*/

import { ConnectionFailedError, serviceCapabilities, transportSymbol } from '@libp2p/interface'
import { Memory } from '@multiformats/multiaddr-matcher'
import { connections } from './connections.js'
Expand Down
69 changes: 0 additions & 69 deletions packages/transport-memory/test/compliance.spec.ts

This file was deleted.

68 changes: 68 additions & 0 deletions packages/transport-memory/test/index.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { defaultLogger } from '@libp2p/logger'
import { peerIdFromString } from '@libp2p/peer-id'
import { multiaddr } from '@multiformats/multiaddr'
import { expect } from 'aegir/chai'
import { stubInterface } from 'sinon-ts'
import { memory } from '../src/index.js'
import type { Upgrader, Connection } from '@libp2p/interface'

describe('memory', () => {
let upgrader: Upgrader

beforeEach(async () => {
upgrader = stubInterface<Upgrader>({
upgradeInbound: async (maConn) => {
return stubInterface<Connection>()
},
upgradeOutbound: async (maConn) => {
return stubInterface<Connection>()
}
})
})

it('should dial', async () => {
const transport = memory()({
peerId: peerIdFromString('12D3KooWJRSrypvnpHgc6ZAgyCni4KcSmbV7uGRaMw5LgMKT18fq'),
logger: defaultLogger()
})
const ma = multiaddr('/memory/address-1')
const listener = transport.createListener({
upgrader
})
await listener.listen(ma)

const conn = await transport.dial(ma, {
upgrader
})

await conn.close()
await listener.close()
})

it('should dial with latency', async () => {
const latency = 1000
const transport = memory({
latency
})({
peerId: peerIdFromString('12D3KooWJRSrypvnpHgc6ZAgyCni4KcSmbV7uGRaMw5LgMKT18fq'),
logger: defaultLogger()
})
const ma = multiaddr('/memory/address-1')
const listener = transport.createListener({
upgrader
})
await listener.listen(ma)

const start = Date.now()
const conn = await transport.dial(ma, {
upgrader
})
const end = Date.now()

// +/- a bit
expect(end - start).to.be.greaterThan(latency / 1.1)

await conn.close()
await listener.close()
})
})

0 comments on commit 050b01f

Please sign in to comment.