Skip to content

Commit

Permalink
Added requestListener & onConnection options to yjs-websocket/server
Browse files Browse the repository at this point in the history
  • Loading branch information
big-camel committed Mar 19, 2023
1 parent 1eeb459 commit 6b7ca31
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 40 deletions.
6 changes: 4 additions & 2 deletions docs/config/yjs.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ startServer({
host: string;
// Port to listen on, default is 1234
port: number;
// http server request listener
requestListener?: http.RequestListener;
// Custom authentication, connection will be terminated if code !== 200 is returned
auth?: (request: http.IncomingMessage, ws: WebSocket) => Promise<void | { code: number; data: string | Buffer }>;
// Persistence options, false means no persistence
Expand All @@ -130,7 +132,7 @@ startServer({
contentField?: string;
// Update callback
callback?: UpdateCallback;
// Initial value
initialValue?: Element;
// Connection callback
onConnection?: (doc: WSSharedDoc, conn: WebSocket.WebSocket) => void;
})
```
6 changes: 4 additions & 2 deletions docs/config/yjs.zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ startServer({
host: string;
// 监听的端口,默认为 1234
port: number;
// http server request listener
requestListener?: http.RequestListener;
// 自定义效验,返回 code !== 200 时,会终止连接
auth?: (
request: http.IncomingMessage,
Expand All @@ -131,7 +133,7 @@ startServer({
contentField?: string;
// 更新回调
callback?: UpdateCallback;
// 初始值
initialValue?: Element;
// 连接回调
onConnection?: (doc: WSSharedDoc, conn: WebSocket.WebSocket) => void;
})
```
2 changes: 1 addition & 1 deletion examples/react/editor.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ export default () => {
onLoad={setEngine}
toc={true}
member={member}
yjs={IS_DEV ? false : yjsConfig}
yjs={false ? false : yjsConfig}
onSave={onSave}
/>
</Context.Provider>
Expand Down
74 changes: 43 additions & 31 deletions plugins/yjs-websocket/src/server/start.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
import WebSocket from 'ws';
import http from 'http';
import { Element } from '../model';
import { setupWSConnection, UpdateCallback } from './utils';
import { initPersistence, PersistenceOptions } from './persistence';
import { WSSharedDoc } from './types';

const wss = new WebSocket.Server({ noServer: true });

export interface ServerOptions {
// http server host
host: string;
// http server port
port: number;
// http server request listener
requestListener?: http.RequestListener;
// 效验
auth?: (
request: http.IncomingMessage,
ws: WebSocket,
) => Promise<void | { code: number; data: string | Buffer }>;
// 连接回调
onConnection?: (doc: WSSharedDoc, conn: WebSocket.WebSocket) => void;
// 持久化选项,false 为不持久化
persistenceOptions?: PersistenceOptions | false;
// 文档内容字段,默认为 content
Expand All @@ -24,43 +30,49 @@ export interface ServerOptions {

const SERVER_OPTIONS_WEAKMAP = new WeakMap<http.Server, ServerOptions>();

const server = http.createServer((request, response) => {
response.writeHead(200, { 'Content-Type': 'text/plain' });
response.end('okay');
});

wss.on('connection', (conn, req) => {
const { callback } = SERVER_OPTIONS_WEAKMAP.get(server) ?? {};
setupWSConnection(conn, req, {
callback,
});
});

server.on('upgrade', (request, socket, head) => {
// You may check auth of request here..
// See https://github.com/websockets/ws#client-authentication
const handleAuth = (ws: WebSocket) => {
const { auth = () => Promise.resolve() } =
SERVER_OPTIONS_WEAKMAP.get(server) ?? {};
auth(request, ws).then((res) => {
if (res && res.code !== 200) {
ws.close(res.code, res.data);
} else {
wss.emit('connection', ws, request);
}
});
};
wss.handleUpgrade(request, socket, head, handleAuth);
});

export const startServer = (options: ServerOptions) => {
SERVER_OPTIONS_WEAKMAP.set(server, options);
const {
auth = () => Promise.resolve(),
requestListener,
onConnection,
host,
port,
persistenceOptions = { provider: 'leveldb' },
contentField,
} = options;
const server = http.createServer((request, response) => {
if (requestListener) {
requestListener(request, response);
} else {
response.writeHead(200, { 'Content-Type': 'text/plain' });
response.end('okay');
}
});

SERVER_OPTIONS_WEAKMAP.set(server, options);

wss.on('connection', (conn, req) => {
const { callback } = SERVER_OPTIONS_WEAKMAP.get(server) ?? {};
setupWSConnection(conn, req, {
callback,
onConnection,
});
});

server.on('upgrade', (request, socket, head) => {
// You may check auth of request here..
// See https://github.com/websockets/ws#client-authentication
const handleAuth = (ws: WebSocket) => {
auth(request, ws).then((res) => {
if (res && res.code !== 200) {
ws.close(res.code, res.data);
} else {
wss.emit('connection', ws, request);
}
});
};
wss.handleUpgrade(request, socket, head, handleAuth);
});

if (persistenceOptions !== false) {
initPersistence(persistenceOptions, contentField);
Expand Down
1 change: 1 addition & 0 deletions plugins/yjs-websocket/src/server/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export interface WSSharedDoc extends Doc {
name: string;
conns: Map<WebSocket, Set<number>>;
awareness: awarenessProtocol.Awareness;
sendCustomMessage: (conn: WebSocket, message: Record<string, any>) => void;
}

export type ContentType = 'Array' | 'Map' | 'Text' | 'XmlFragment';
20 changes: 16 additions & 4 deletions plugins/yjs-websocket/src/server/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ class WSSharedDoc extends Y.Doc implements WSSharedDocInterface {
}
}

sendCustomMessage(conn: WebSocket.WebSocket, message: Record<string, any>) {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageCustom);
encoding.writeAny(encoder, message);
send(this, conn, encoding.toUint8Array(encoder));
}

destroy(): void {
super.destroy();
}
Expand Down Expand Up @@ -248,6 +255,10 @@ interface SetupWSConnectionOptions {
gc?: boolean;
pingTimeout?: number;
callback?: UpdateCallback;
onConnection?: (
doc: WSSharedDocInterface,
conn: WebSocket.WebSocket,
) => void;
}

export const setupWSConnection = (
Expand All @@ -260,23 +271,24 @@ export const setupWSConnection = (
gc = true,
pingTimeout = 30000,
callback,
onConnection,
} = options ?? {};
conn.binaryType = 'arraybuffer';
// get doc, initialize if it does not exist yet
const doc = getYDoc(
docName,
gc,
() => {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageCustom);
encoding.writeAny(encoder, {
doc.sendCustomMessage(conn, {
action: 'initValue',
});
send(doc, conn, encoding.toUint8Array(encoder));
},
callback,
);
doc.conns.set(conn, new Set());
if (onConnection) {
onConnection(doc, conn);
}
// listen and reply to events
conn.on('message', (message: ArrayBuffer) =>
messageListener(conn, doc, new Uint8Array(message)),
Expand Down

0 comments on commit 6b7ca31

Please sign in to comment.