Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): new worker workspace engine #9257

Draft
wants to merge 1 commit into
base: canary
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/common/nbstore/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
"sideEffects": false,
"exports": {
".": "./src/index.ts",
"./worker": "./src/worker/index.ts",
"./worker/client": "./src/worker/client.ts",
"./worker/consumer": "./src/worker/consumer.ts",
"./idb": "./src/impls/idb/index.ts",
"./idb/v1": "./src/impls/idb/v1/index.ts",
"./cloud": "./src/impls/cloud/index.ts",
Expand Down
168 changes: 166 additions & 2 deletions packages/common/nbstore/src/frontend/doc.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import { groupBy } from 'lodash-es';
import { nanoid } from 'nanoid';
import { Subject } from 'rxjs';
import {
combineLatest,
map,
Observable,
of,
Subject,
Subscription,
} from 'rxjs';
import {
applyUpdate,
type Doc as YDoc,
Expand All @@ -9,7 +16,7 @@ import {
} from 'yjs';

import type { DocRecord, DocStorage } from '../storage';
import type { DocSync } from '../sync/doc';
import type { DocSync, DocSyncDocState, DocSyncState } from '../sync/doc';
import { AsyncPriorityQueue } from '../utils/async-priority-queue';
import { isEmptyUpdate } from '../utils/is-empty-update';
import { throwIfAborted } from '../utils/throw-if-aborted';
Expand All @@ -36,6 +43,60 @@ interface DocFrontendOptions {
mergeUpdates?: (updates: Uint8Array[]) => Promise<Uint8Array> | Uint8Array;
}

export type DocFrontendDocState = {
/**
* some data is available in yjs doc instance
*/
ready: boolean;
/**
* data is loaded from local doc storage and applied to yjs doc instance
*/
loaded: boolean;
/**
* some data is being applied to yjs doc instance, or some data is being saved to local doc storage
*/
updating: boolean;
/**
* the doc is syncing with remote peers
*/
syncing: boolean;
/**
* the doc is retrying to sync with remote peers
*/
syncRetrying: boolean;
/**
* the error message when syncing with remote peers
*/
syncErrorMessage: string | null;
};

export type DocFrontendState = {
/**
* total number of docs
*/
total: number;
/**
* number of docs that have been loaded to yjs doc instance
*/
loaded: number;
/**
* number of docs that are syncing with remote peers
*/
syncing: number;
/**
* whether all docs are synced with remote peers
*/
synced: boolean;
/**
* whether the doc is retrying to sync with remote peers
*/
syncRetrying: boolean;
/**
* the error message when syncing with remote peers
*/
syncErrorMessage: string | null;
};

export class DocFrontend {
private readonly uniqueId = `frontend:${nanoid()}`;

Expand All @@ -60,6 +121,63 @@ export class DocFrontend {
readonly options: DocFrontendOptions = {}
) {}

docState$(docId: string): Observable<DocFrontendDocState> {
const frontendState$ = new Observable<{
ready: boolean;
loaded: boolean;
updating: boolean;
}>(subscribe => {
const next = () => {
subscribe.next({
ready: this.status.readyDocs.has(docId) ?? false,
loaded: this.status.connectedDocs.has(docId),
updating:
(this.status.jobMap.get(docId)?.length ?? 0) > 0 ||
this.status.currentJob?.docId === docId,
});
};
next();
return this.statusUpdatedSubject$.subscribe(updatedId => {
if (updatedId === docId) next();
});
});
const syncState$ =
this.sync?.docState$(docId) ?? of<DocSyncDocState | undefined>(undefined);
return combineLatest([frontendState$, syncState$]).pipe(
map(([frontend, sync]) => ({
...frontend,
syncing: sync?.syncing ?? false,
syncRetrying: sync?.retrying ?? false,
syncErrorMessage: sync?.errorMessage ?? null,
}))
);
}

state$ = combineLatest([
new Observable<{ total: number; loaded: number }>(subscriber => {
const next = () => {
subscriber.next({
total: this.status.docs.size,
loaded: this.status.connectedDocs.size,
});
};
next();
return this.statusUpdatedSubject$.subscribe(() => {
next();
});
}),
this.sync?.state$ ?? of<DocSyncState | undefined>(undefined),
]).pipe(
map(([frontend, sync]) => ({
total: sync?.total ?? frontend.total,
loaded: frontend.loaded,
syncing: sync?.syncing ?? 0,
synced: sync?.synced ?? false,
syncRetrying: sync?.retrying ?? false,
syncErrorMessage: sync?.errorMessage ?? null,
}))
) satisfies Observable<DocFrontendState>;

start() {
if (this.abort.signal.aborted) {
throw new Error('doc frontend can only start once');
Expand Down Expand Up @@ -314,4 +432,50 @@ export class DocFrontend {

return merge(updates.filter(bin => !isEmptyUpdate(bin)));
}

async waitForSynced(abort?: AbortSignal) {
let sub: Subscription | undefined = undefined;
return Promise.race([
new Promise<void>(resolve => {
sub = this.state$?.subscribe(status => {
if (status.synced) {
resolve();
}
});
}),
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]).finally(() => {
sub?.unsubscribe();
});
}

async waitForDocReady(docId: string, abort?: AbortSignal) {
let sub: Subscription | undefined = undefined;
return Promise.race([
new Promise<void>(resolve => {
sub = this.docState$(docId).subscribe(state => {
if (state.ready) {
resolve();
}
});
}),
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]).finally(() => {
sub?.unsubscribe();
});
}
}
3 changes: 3 additions & 0 deletions packages/common/nbstore/src/frontend/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from './awareness';
export * from './blob';
export * from './doc';
2 changes: 2 additions & 0 deletions packages/common/nbstore/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
export * from './connection';
export * from './frontend';
export * from './storage';
export * from './sync';
6 changes: 4 additions & 2 deletions packages/common/nbstore/src/sync/doc/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { DocSyncPeer } from './peer';
export interface DocSyncState {
total: number;
syncing: number;
synced: boolean;
retrying: boolean;
errorMessage: string | null;
}
Expand All @@ -33,8 +34,9 @@ export class DocSyncImpl implements DocSync {
this.peers.map(peer => peer.peerState$)
).pipe(
map(allPeers => ({
total: allPeers.reduce((acc, peer) => acc + peer.total, 0),
syncing: allPeers.reduce((acc, peer) => acc + peer.syncing, 0),
total: allPeers.reduce((acc, peer) => Math.max(acc, peer.total), 0),
syncing: allPeers.reduce((acc, peer) => Math.max(acc, peer.syncing), 0),
synced: allPeers.every(peer => peer.synced),
retrying: allPeers.some(peer => peer.retrying),
errorMessage:
allPeers.find(peer => peer.errorMessage)?.errorMessage ?? null,
Expand Down
12 changes: 9 additions & 3 deletions packages/common/nbstore/src/sync/doc/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ interface PeerState {
total: number;
syncing: number;
retrying: boolean;
synced: boolean;
errorMessage: string | null;
}

interface PeerDocState {
syncing: boolean;
synced: boolean;
retrying: boolean;
errorMessage: string | null;
}
Expand Down Expand Up @@ -121,6 +123,7 @@ export class DocSyncPeer {
subscribe.next({
total: this.status.docs.size,
syncing: this.status.docs.size,
synced: false,
retrying: this.status.retrying,
errorMessage: this.status.errorMessage,
});
Expand All @@ -131,6 +134,7 @@ export class DocSyncPeer {
syncing: syncing,
retrying: this.status.retrying,
errorMessage: this.status.errorMessage,
synced: syncing === 0,
});
}
};
Expand All @@ -143,10 +147,12 @@ export class DocSyncPeer {
docState$(docId: string) {
return new Observable<PeerDocState>(subscribe => {
const next = () => {
const syncing =
!this.status.connectedDocs.has(docId) ||
this.status.jobMap.has(docId);
subscribe.next({
syncing:
!this.status.connectedDocs.has(docId) ||
this.status.jobMap.has(docId),
syncing: syncing,
synced: !syncing,
retrying: this.status.retrying,
errorMessage: this.status.errorMessage,
});
Expand Down
2 changes: 2 additions & 0 deletions packages/common/nbstore/src/sync/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import { AwarenessSyncImpl } from './awareness';
import { BlobSyncImpl } from './blob';
import { DocSyncImpl, type DocSyncState } from './doc';

export type { DocSyncDocState, DocSyncState } from './doc';

export interface SyncState {
doc?: DocSyncState;
}
Expand Down
74 changes: 10 additions & 64 deletions packages/common/nbstore/src/worker/client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { OpClient } from '@toeverything/infra/op';

import { DummyConnection } from '../connection';
import { DocFrontend } from '../frontend/doc';
import { AwarenessFrontend, BlobFrontend, DocFrontend } from '../frontend';
import {
type AwarenessRecord,
type AwarenessStorage,
Expand All @@ -19,23 +19,26 @@ import type { BlobSync } from '../sync/blob';
import type { DocSync } from '../sync/doc';
import type { WorkerOps } from './ops';

export type { WorkerInitOptions } from './ops';

export class WorkerClient {
constructor(
private readonly client: OpClient<WorkerOps>,
private readonly options: StorageOptions
) {}

readonly docStorage = new WorkerDocStorage(this.client, this.options);
readonly blobStorage = new WorkerBlobStorage(this.client, this.options);
readonly awarenessStorage = new WorkerAwarenessStorage(
private readonly docStorage = new WorkerDocStorage(this.client, this.options);
private readonly blobStorage = new WorkerBlobStorage(
this.client,
this.options
);
readonly docSync = new WorkerDocSync(this.client);
readonly blobSync = new WorkerBlobSync(this.client);
readonly awarenessSync = new WorkerAwarenessSync(this.client);
private readonly docSync = new WorkerDocSync(this.client);
private readonly blobSync = new WorkerBlobSync(this.client);
private readonly awarenessSync = new WorkerAwarenessSync(this.client);

readonly docFrontend = new DocFrontend(this.docStorage, this.docSync);
readonly blobFrontend = new BlobFrontend(this.blobStorage, this.blobSync);
readonly awarenessFrontend = new AwarenessFrontend(this.awarenessSync);
}

class WorkerDocStorage implements DocStorage {
Expand Down Expand Up @@ -156,63 +159,6 @@ class WorkerBlobStorage implements BlobStorage {
connection = new DummyConnection();
}

class WorkerAwarenessStorage implements AwarenessStorage {
constructor(
private readonly client: OpClient<WorkerOps>,
private readonly options: StorageOptions
) {}

readonly storageType = 'awareness';
readonly peer = this.options.peer;
readonly spaceType = this.options.type;
readonly spaceId = this.options.id;
readonly universalId = universalId(this.options);

update(record: AwarenessRecord, origin?: string): Promise<void> {
return this.client.call('awarenessStorage.update', {
awareness: record,
origin,
});
}
subscribeUpdate(
id: string,
onUpdate: (update: AwarenessRecord, origin?: string) => void,
onCollect: () => Promise<AwarenessRecord | null>
): () => void {
const subscription = this.client
.ob$('awarenessStorage.subscribeUpdate', id)
.subscribe({
next: update => {
if (update.type === 'awareness-update') {
onUpdate(update.awareness, update.origin);
}
if (update.type === 'awareness-collect') {
onCollect()
.then(record => {
if (record) {
this.client
.call('awarenessStorage.collect', {
awareness: record,
collectId: update.collectId,
})
.catch(err => {
console.error('error feedback collected awareness', err);
});
}
})
.catch(err => {
console.error('error collecting awareness', err);
});
}
},
});
return () => {
subscription.unsubscribe();
};
}
connection = new DummyConnection();
}

class WorkerDocSync implements DocSync {
constructor(private readonly client: OpClient<WorkerOps>) {}

Expand Down
Loading
Loading