Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/replication p2pcf #5305

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions docs-src/docs/replication-webrtc.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,32 @@ const replicationPool = await replicateWebRTC(
* To learn how to create a custom connection handler, read the source code,
* it is pretty simple.
*/
connectionHandlerCreator: getConnectionHandlerSimplePeer(
'wss://example.com:8080',
// only in Node.js, we need the wrtc library
// because Node.js does not contain the WebRTC API.
require('wrtc')
),
connectionHandlerCreator: getConnectionHandlerSimplePeer('wss://example.com:8080'),
pull: {},
push: {}
}
);
replicationPool.error$.subscribe(err => { /* ... */ });
replicationPool.cancel();

```

### Polyfill the WebRTC API in Node.js

While all modern browsers support the WebRTC API, it is missing in Node.js which will throw the error `No WebRTC support: Specify opts.wrtc option in this environment`. Therefore you have to polyfill it with a compatible WebRTC polyfill. It is recommended to use the [node-datachannel package](https://github.com/murat-dogan/node-datachannel/tree/master/polyfill) which **does not** come with RxDB but has to be installed before via `npm install node-datachannel --save`.

```ts
import nodeDatachannelPolyfill from 'node-datachannel/polyfill';
const replicationPool = await replicateWebRTC(
{
/* ... */
connectionHandlerCreator: getConnectionHandlerSimplePeer(
'wss://example.com:8080',
nodeDatachannelPolyfill
)
/* ... */
}
);
```

## Live replications

Expand Down
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@
"array-push-at-sort-position": "4.0.1",
"as-typed": "1.3.2",
"broadcast-channel": "6.0.0",
"convert-hex": "0.1.0",
"crypto-js": "4.2.0",
"custom-idle-queue": "3.0.1",
"dexie": "4.0.0-alpha.4",
Expand All @@ -455,6 +456,7 @@
"nats": "2.18.0",
"oblivious-set": "1.4.0",
"ohash": "1.1.3",
"p2pcf": "1.3.14",
"reconnecting-websocket": "4.4.0",
"simple-peer": "9.11.1",
"socket.io-client": "4.7.2",
Expand Down Expand Up @@ -542,6 +544,7 @@
"mocha": "10.2.0",
"mocha.parallel": "0.15.6",
"nconf": "0.12.1",
"node-datachannel": "0.5.1",
"node-pre-gyp": "0.17.0",
"pre-commit": "1.2.2",
"process": "0.11.10",
Expand All @@ -568,4 +571,4 @@
"webpack-cli": "5.1.4",
"webpack-dev-server": "4.15.1"
}
}
}
166 changes: 97 additions & 69 deletions src/plugins/replication-webrtc/connection-handler-p2pcf.ts
Original file line number Diff line number Diff line change
@@ -1,77 +1,105 @@
// import { Subject } from 'rxjs';
// import { PROMISE_RESOLVE_VOID, randomCouchString } from '../../util';
// import type {
// P2PConnectionHandler,
// P2PConnectionHandlerCreator,
// P2PMessage,
// P2PPeer,
// PeerWithMessage,
// PeerWithResponse
// } from './p2p-types';
import { Subject } from 'rxjs';
import P2PCF from './p2pcf.ts';
import {
PROMISE_RESOLVE_VOID,
randomCouchString
} from '../utils/index.ts';
import type {
PeerWithMessage,
PeerWithResponse,
WebRTCConnectionHandler,
WebRTCConnectionHandlerCreator,
WebRTCMessage,
WebRTCPeer
} from './webrtc-types.ts';
import type {
Instance as SimplePeer,
default as Peer
} from 'simple-peer';
import type {
RxError,
RxTypeError
} from '../../rx-error.ts';

// import P2PCF from 'p2pcf';
/**
* Returns a connection handler that uses the Cloudflare worker signaling server
* @link https://github.com/gfodor/p2pcf
*/
export function getConnectionHandlerP2PCF(
p2pCFOptions: {
workerUrl?: string
} = {}
): WebRTCConnectionHandlerCreator {
// const P2PCF = require('p2pcf');

// /**
// * Returns a connection handler that uses the Cloudflare worker signaling server
// * @link https://github.com/gfodor/p2pcf
// */
// export function getConnectionHandlerP2PCF(
// p2pCFOptions: {
// workerUrl?: string
// } = {}
// ): P2PConnectionHandlerCreator {
// // const P2PCF = require('p2pcf');
const creator: WebRTCConnectionHandlerCreator = async (options) => {
const clientId = randomCouchString(10);
const p2p2 = new P2PCF(clientId, options.topic, {
fastPollingRateMs: 100,
slowPollingRateMs: 200,
idlePollingAfterMs: 1000
});
p2p2.start();

// const creator: P2PConnectionHandlerCreator = (options) => {
// const clientId = randomCouchString(10);
// const p2p2 = new P2PCF(clientId, options.topic, p2pCFOptions);
const connect$ = new Subject<WebRTCPeer>();
const disconnect$ = new Subject<WebRTCPeer>();
const error$ = new Subject<RxError | RxTypeError>();
const message$ = new Subject<PeerWithMessage>();
const response$ = new Subject<PeerWithResponse>();

// const connect$ = new Subject<P2PPeer>();
// p2p2.on('peerconnect', (peer) => connect$.next(peer as any));
p2p2.on('peerconnect', (peer: SimplePeer) =>{
console.log('peerconnect!');
connect$.next(peer as any);
});
p2p2.on('connect', (peer: SimplePeer) =>{
console.log('connect!');
connect$.next(peer as any);
});

// const disconnect$ = new Subject<P2PPeer>();
// p2p2.on('peerclose', (peer) => disconnect$.next(peer as any));
p2p2.on('peerclose', (peer: SimplePeer) => {
console.log('peerclose');
disconnect$.next(peer as any);
});

// const message$ = new Subject<PeerWithMessage>();
// const response$ = new Subject<PeerWithResponse>();
// p2p2.on('msg', (peer, messageOrResponse) => {
// if (messageOrResponse.result) {
// response$.next({
// peer: peer as any,
// response: messageOrResponse
// });
// } else {
// message$.next({
// peer: peer as any,
// message: messageOrResponse
// });
// }
p2p2.on('msg', (peer: SimplePeer, messageOrResponse: any) => {
if (messageOrResponse.result) {
response$.next({
peer: peer as any,
response: messageOrResponse
});
} else {
message$.next({
peer: peer as any,
message: messageOrResponse
});
}

// });
});

// const handler: P2PConnectionHandler = {
// connect$,
// disconnect$,
// message$,
// response$,
// async send(peer: P2PPeer, message: P2PMessage) {
// const [responsePeer, response] = await p2p2.send(peer as any, message);
// return {
// peer: responsePeer,
// response
// } as any;
// },
// destroy() {
// p2p2.destroy();
// connect$.complete();
// disconnect$.complete();
// message$.complete();
// response$.complete();
// return PROMISE_RESOLVE_VOID;
// }
// }
// p2p2.start();
// return handler;
// };
// return creator;
// }
const handler: WebRTCConnectionHandler = {
error$,
connect$,
disconnect$,
message$,
response$,
async send(peer: WebRTCPeer, message: WebRTCMessage) {
const [responsePeer, response] = await p2p2.send(peer as any, message) as any;
return {
peer: responsePeer,
response
} as any;
},
destroy() {
p2p2.destroy();
connect$.complete();
disconnect$.complete();
message$.complete();
response$.complete();
return PROMISE_RESOLVE_VOID;
}
}
p2p2.start();
return Promise.resolve(handler);
};
return creator;
}
4 changes: 2 additions & 2 deletions src/plugins/replication-webrtc/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,6 @@ export class RxWebRTCReplicationPool<RxDocType> {

export * from './webrtc-helper.ts';
export * from './webrtc-types.ts';
// export * from './connection-handler-webtorrent';
// export * from './connection-handler-p2pcf';
// export * from './connection-handler-webtorrent.ts';
export * from './connection-handler-p2pcf.ts';
export * from './connection-handler-simple-peer.ts';
Loading
Loading