Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(@libp2p/protocol-perf): Implement perf protocol #1604

Merged
merged 38 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
c524755
feat: implement perf protocol
maschad Jun 19, 2023
0898dd0
test: only runs test in node environment
maschad Jun 21, 2023
2458fbc
test: use CI to output test results
maschad Jun 21, 2023
8aee800
chore: linting fixes + remove unused deps
maschad Jun 21, 2023
2ec297d
Merge branch 'master' into marco/perf
maschad Jun 21, 2023
8ea1870
test: updated workflow
maschad Jun 21, 2023
4de5595
ci: updated workflow to fail if bandwidth drops below 20%
maschad Jun 23, 2023
9a7f9d8
ci: updated job to generate report for performance data
maschad Jun 27, 2023
65bfaf3
Merge branch 'master' into marco/perf
maschad Jun 27, 2023
9bed517
chore: linting + updated ci job
maschad Jun 28, 2023
c1b262f
ci: updated saving performance data
maschad Jun 28, 2023
fe79a72
ci: update rendering
maschad Jun 28, 2023
d28dd92
Merge branch 'master' into marco/perf
maschad Jun 28, 2023
9dd0248
test: update perf test to take an average of 5 runs + standardize bytes
maschad Jun 29, 2023
6661424
ci: Added units to output matrix
maschad Jun 29, 2023
79cbe19
test: updated previous bandwidths
maschad Jun 29, 2023
118703a
ci: updated save performance action
maschad Jun 29, 2023
7eb1a79
wip
maschad Jul 26, 2023
e21cda8
Merge branch 'master' into marco/perf
maschad Jul 26, 2023
b1d89bd
feat: added perf test and output to JSON
maschad Jul 26, 2023
ccd5f26
chore: linting fixes
maschad Jul 26, 2023
cf0981b
Merge branch 'master' into marco/perf
maschad Jul 28, 2023
7f9c716
deps(dev): upgrade aegir
maschad Jul 28, 2023
35c79a4
deps: install missing deps
maschad Jul 28, 2023
ec98ddf
Merge branch 'master' into marco/perf
maschad Jul 28, 2023
8ba8db3
refactor: refactored main runner
maschad Jul 30, 2023
59dc3aa
Merge branch 'master' into marco/perf
maschad Jul 30, 2023
f9b91cb
feat: updated main to use fake peer id for client mode perf
maschad Aug 2, 2023
68dfd68
feat: fixed issue with run server + added stable peer id
maschad Aug 4, 2023
0bff620
refactor: Updated tests + perf signature + linting
maschad Aug 4, 2023
94caca8
Merge branch 'master' into marco/perf
maschad Aug 4, 2023
fb914fd
deps: updated deps
maschad Aug 4, 2023
a28ff46
feat: address perf review comments
maschad Aug 5, 2023
8b056fb
Merge branch 'master' into marco/perf
maschad Aug 5, 2023
5e62a7b
refactor: removed maxInbound and outbound streams on handler + bandwi…
maschad Aug 8, 2023
3ad9ea3
Merge branch 'master' into marco/perf
maschad Aug 8, 2023
0a4eae5
refactor: rename to protocol-perf + update package documentation
maschad Aug 10, 2023
b2da056
Merge branch 'master' into marco/perf
maschad Aug 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/perf/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
This project is dual licensed under MIT and Apache-2.0.

MIT: https://www.opensource.org/licenses/mit
Apache-2.0: https://www.apache.org/licenses/license-2.0
5 changes: 5 additions & 0 deletions packages/perf/LICENSE-APACHE
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
19 changes: 19 additions & 0 deletions packages/perf/LICENSE-MIT
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
The MIT License (MIT)

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
24 changes: 24 additions & 0 deletions packages/perf/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# @libp2p/perf

[![libp2p.io](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/)
[![Discuss](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg?style=flat-square)](https://discuss.libp2p.io)
[![codecov](https://img.shields.io/codecov/c/github/libp2p/js-libp2p.svg?style=flat-square)](https://codecov.io/gh/libp2p/js-libp2p)
[![CI](https://img.shields.io/github/actions/workflow/status/libp2p/js-libp2p/main.yml?branch=master\&style=flat-square)](https://github.com/libp2p/js-libp2p/actions/workflows/main.yml?query=branch%3Amaster)

> Implementation of the [Perf Protocol](https://github.com/libp2p/specs/blob/master/perf/perf.md)

## API Docs

- <https://libp2p.github.io/js-libp2p/modules/_libp2p_perf.html>


## License

Licensed under either of

- Apache 2.0, ([LICENSE-APACHE](LICENSE-APACHE) / <http://www.apache.org/licenses/LICENSE-2.0>)
- MIT ([LICENSE-MIT](LICENSE-MIT) / <http://opensource.org/licenses/MIT>)

## Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.
61 changes: 61 additions & 0 deletions packages/perf/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
{
"name": "@libp2p/perf",
"version": "1.0.0",
"description": "Implementation of Perf Protocol",
"license": "Apache-2.0 OR MIT",
"homepage": "https://github.com/libp2p/js-libp2p/tree/master/packages/perf#readme",
"repository": {
"type": "git",
"url": "git+https://github.com/libp2p/js-libp2p.git"
},
"bugs": {
"url": "https://github.com/libp2p/js-libp2p/issues"
},
"type": "module",
"author": "@maschad / @marcopolo",
"files": [
"src",
"dist",
"!dist/test",
"!**/*.tsbuildinfo"
],
"eslintConfig": {
"extends": "ipfs",
"parserOptions": {
"sourceType": "module"
}
},
"scripts": {
"start": "node dist/src/main.js",
"build": "aegir build",
"test": "aegir test",
"clean": "aegir clean",
"lint": "aegir lint",
"test:chrome": "aegir test -t browser --cov",
"test:chrome-webworker": "aegir test -t webworker",
"test:firefox": "aegir test -t browser -- --browser firefox",
"test:firefox-webworker": "aegir test -t webworker -- --browser firefox",
"test:node": "aegir test -t node --cov",
"dep-check": "aegir dep-check",
"renderResults": "node dist/src/renderResults.js"
},
"dependencies": {
"@chainsafe/libp2p-yamux": "^5.0.0",
"@libp2p/crypto": "^2.0.0",
"@libp2p/interface": "^0.1.0",
"@libp2p/interface-compliance-tests": "^4.0.0",
"@libp2p/interface-internal": "^0.1.0",
"@libp2p/interfaces": "3.3.2",
"@libp2p/logger": "^3.0.0",
"@libp2p/peer-id-factory": "3.0.0",
"@libp2p/tcp": "^8.0.0",
"@multiformats/multiaddr": "^12.1.5",
"libp2p": "^0.46.1",
"p-wait-for": "^5.0.2",
"uint8arrays": "^4.0.6",
"yargs": "^17.7.2"
},
"devDependencies": {
"aegir": "^40.0.8"
}
}
2 changes: 2 additions & 0 deletions packages/perf/src/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export const PROTOCOL_NAME = '/perf/1.0.0'
export const WRITE_BLOCK_SIZE = BigInt(64 << 10)
192 changes: 192 additions & 0 deletions packages/perf/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/**
* @packageDocumentation
*
* The `performanceService` implements the [perf protocol](https://github.com/libp2p/specs/blob/master/perf/perf.md), which is used to measure performance within and across libp2p implementations
* addresses.
*
* @example
*
* ```typescript
* import { createLibp2p } from 'libp2p'
* import { circuitRelayTransport } from 'libp2p/circuit-relay'
*
* const node = await createLibp2p({
* service: [
* perfService()
* ]
* })
* ```
*
* The `measurePerformance` function can be used to measure the latency and throughput of a connection.
* server. This will not work in browsers.
*
* @example
*
* ```typescript
* import { createLibp2p } from 'libp2p'
* import { circuitRelayServer } from 'libp2p/circuit-relay'
*
* const node = await createLibp2p({
* services: [
* perf: perfService()
* ]
* })
*
* const connection = await node.dial(multiaddr(multiaddrAddress))
*
* await node.services.perf.measurePerformance(connection, BigInt(uploadBytes), BigInt(downloadBytes))
*
* ```
*/
maschad marked this conversation as resolved.
Show resolved Hide resolved

import { logger } from '@libp2p/logger'
maschad marked this conversation as resolved.
Show resolved Hide resolved
import { PROTOCOL_NAME, WRITE_BLOCK_SIZE } from './constants.js'
import type { Connection } from '@libp2p/interface/connection'
import type { Startable } from '@libp2p/interface/startable'
import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager'
import type { IncomingStreamData, Registrar } from '@libp2p/interface-internal/registrar'
import type { AbortOptions } from '@libp2p/interfaces'

const log = logger('libp2p:perf')

export const defaultInit: PerfServiceInit = {
protocolName: '/perf/1.0.0',
writeBlockSize: BigInt(64 << 10)
}

export interface PerfService {
measurePerformance: (startTime: number, connection: Connection, sendBytes: bigint, recvBytes: bigint, options?: AbortOptions) => Promise<number>
}

export interface PerfServiceInit {
protocolName?: string
maxInboundStreams?: number
maxOutboundStreams?: number
timeout?: number
writeBlockSize?: bigint
}

export interface PerfServiceComponents {
registrar: Registrar
connectionManager: ConnectionManager
}

class DefaultPerfService implements Startable, PerfService {
public readonly protocol: string
private readonly components: PerfServiceComponents
private started: boolean
private readonly databuf: ArrayBuffer
private readonly writeBlockSize: bigint

constructor (components: PerfServiceComponents, init: PerfServiceInit) {
this.components = components
this.started = false
this.protocol = init.protocolName ?? PROTOCOL_NAME
this.writeBlockSize = init.writeBlockSize ?? WRITE_BLOCK_SIZE
this.databuf = new ArrayBuffer(Number(init.writeBlockSize))
}

async start (): Promise<void> {
await this.components.registrar.handle(this.protocol, (data: IncomingStreamData) => {
void this.handleMessage(data).catch((err) => {
log.error('error handling perf protocol message', err)
})
})
this.started = true
}

async stop (): Promise<void> {
await this.components.registrar.unhandle(this.protocol)
this.started = false
}

isStarted (): boolean {
return this.started
}

async handleMessage (data: IncomingStreamData): Promise<void> {
const { stream } = data

const writeBlockSize = this.writeBlockSize

let bytesToSendBack: bigint | null = null

for await (const buf of stream.source) {
if (bytesToSendBack === null) {
maschad marked this conversation as resolved.
Show resolved Hide resolved
bytesToSendBack = BigInt(buf.getBigUint64(0, false))
}
// Ingest all the bufs and wait for the read side to close
}

const uint8Buf = new Uint8Array(this.databuf)

if (bytesToSendBack === null) {
throw new Error('bytesToSendBack was not set')
}

await stream.sink(async function * () {
while (bytesToSendBack > 0n) {
let toSend: bigint = writeBlockSize
if (toSend > bytesToSendBack) {
toSend = bytesToSendBack
}
bytesToSendBack = bytesToSendBack - toSend
yield uint8Buf.subarray(0, Number(toSend))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can see we're always going to be sending byte arrays made up of zeroes in this protocol?

If there's any data compression applied could this affect benchmark outputs?

Would sending random data be better?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both random bytes and zeros is fine by the specification. See past discussion by you, @achingbrain.

libp2p/specs#478 (comment)

All other implementations use zeroes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Zero is fine, random bytes can also be fine (as long as it's not a different set of random bytes, because then you may be limited by how fast you can generate random bytes).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look at me making the same point twice 😆

you may be limited by how fast you can generate random bytes

You can generate them ahead of time so this is solvable. I haven't measured it but in my head at least any network operation will be orders of magnitude slower than a pRNG?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's measure it! On my machine (M1 Max):

$  dd if=/dev/urandom of=/dev/null bs=1500 count=100
100+0 records in
100+0 records out
150000 bytes (150 kB, 146 KiB) copied, 0.000466 s, 322 MB/s

That's 2.5 Gigabits/s, while fast is not fast enough to saturate a fairly common 10 gbps line.

(Yes you can fiddle with blocksizes, but I think 1500 is fair as it's a common MTU size.)

But what's the issue with sending zero bytes? Is it that you're afraid we'll silently introduce compression on a transport and thus not correctly measure throughput? If that's the case sending zero bytes is actually ideal because it would be obvious if there is compression happening.

There's also the question of arbitrarily compressing at a transport level. That kind of thing opens you up the TLS CRIME style attacks. So I don't think we should support compression without being application aware.

}
}())
}

async measurePerformance (startTime: number, connection: Connection, sendBytes: bigint, recvBytes: bigint, options: AbortOptions = {}): Promise<number> {
log('opening stream on protocol %s to %p', this.protocol, connection.remotePeer)

const uint8Buf = new Uint8Array(this.databuf)
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved

const writeBlockSize = this.writeBlockSize
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved

const stream = await connection.newStream([this.protocol])

// Convert sendBytes to uint64 big endian buffer
const view = new DataView(this.databuf)
view.setBigInt64(0, recvBytes, false)

log('sending %i bytes to %p', sendBytes, connection.remotePeer)
try {
await stream.sink((async function * () {
// Send the number of bytes to receive
yield uint8Buf.subarray(0, 8)
// Send the number of bytes to send
while (sendBytes > 0n) {
let toSend: bigint = writeBlockSize
if (toSend > sendBytes) {
toSend = sendBytes
}
sendBytes = sendBytes - toSend
yield uint8Buf.subarray(0, Number(toSend))
}
})())

// Read the received bytes
let actualRecvdBytes = BigInt(0)
for await (const buf of stream.source) {
actualRecvdBytes += BigInt(buf.length)
}

if (actualRecvdBytes !== recvBytes) {
throw new Error(`Expected to receive ${recvBytes} bytes, but received ${actualRecvdBytes}`)
}
} catch (err) {
log('error sending %i bytes to %p: %s', sendBytes, connection.remotePeer, err)
throw err
} finally {
log('performed %s to %p', this.protocol, connection.remotePeer)
await stream.close()
}

// Return the latency
return Date.now() - startTime
}
}

export function perfService (init: PerfServiceInit = {}): (components: PerfServiceComponents) => PerfService {
return (components) => new DefaultPerfService(components, init)
}
Loading
Loading