Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DTP-1034] Emit Live Objects lifecycle events #1958

Open
wants to merge 4 commits into
base: DTP-1147/fix-flaky-liveobjects-tests
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 50 additions & 11 deletions src/plugins/liveobjects/liveobject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import type EventEmitter from 'common/lib/util/eventemitter';
import { LiveObjects } from './liveobjects';
import { StateMessage, StateObject, StateOperation } from './statemessage';

enum LiveObjectEvents {
Updated = 'Updated',
export enum LiveObjectSubscriptionEvent {
updated = 'updated',
}

export interface LiveObjectData {
Expand All @@ -25,12 +25,23 @@ export interface SubscribeResponse {
unsubscribe(): void;
}

export enum LiveObjectLifecycleEvent {
deleted = 'deleted',
}

export type LiveObjectLifecycleEventCallback = () => void;

export interface OnLiveObjectLifecycleEventResponse {
off(): void;
}

export abstract class LiveObject<
TData extends LiveObjectData = LiveObjectData,
TUpdate extends LiveObjectUpdate = LiveObjectUpdate,
> {
protected _client: BaseClient;
protected _eventEmitter: EventEmitter;
protected _subscriptions: EventEmitter;
protected _lifecycleEvents: EventEmitter;
protected _objectId: string;
/**
* Represents an aggregated value for an object, which combines the initial value for an object from the create operation,
Expand All @@ -55,7 +66,8 @@ export abstract class LiveObject<
objectId: string,
) {
this._client = this._liveObjects.getClient();
this._eventEmitter = new this._client.EventEmitter(this._client.logger);
this._subscriptions = new this._client.EventEmitter(this._client.logger);
this._lifecycleEvents = new this._client.EventEmitter(this._client.logger);
this._objectId = objectId;
this._dataRef = this._getZeroValueData();
// use empty timeserials vector by default, so any future operation can be applied to this object
Expand All @@ -67,10 +79,10 @@ export abstract class LiveObject<
subscribe(listener: (update: TUpdate) => void): SubscribeResponse {
this._liveObjects.throwIfMissingStateSubscribeMode();

this._eventEmitter.on(LiveObjectEvents.Updated, listener);
this._subscriptions.on(LiveObjectSubscriptionEvent.updated, listener);

const unsubscribe = () => {
this._eventEmitter.off(LiveObjectEvents.Updated, listener);
this._subscriptions.off(LiveObjectSubscriptionEvent.updated, listener);
};

return { unsubscribe };
Expand All @@ -86,12 +98,39 @@ export abstract class LiveObject<
return;
}

this._eventEmitter.off(LiveObjectEvents.Updated, listener);
this._subscriptions.off(LiveObjectSubscriptionEvent.updated, listener);
}

unsubscribeAll(): void {
// can allow calling this public method without checking for state modes on the channel as the result of this method is not dependant on them
this._eventEmitter.off(LiveObjectEvents.Updated);
this._subscriptions.off(LiveObjectSubscriptionEvent.updated);
}

on(event: LiveObjectLifecycleEvent, callback: LiveObjectLifecycleEventCallback): OnLiveObjectLifecycleEventResponse {
// we don't require any specific channel mode to be set to call this public method
this._lifecycleEvents.on(event, callback);

const off = () => {
this._lifecycleEvents.off(event, callback);
};

return { off };
}

off(event: LiveObjectLifecycleEvent, callback: LiveObjectLifecycleEventCallback): void {
// we don't require any specific channel mode to be set to call this public method

// prevent accidentally calling .off without any arguments on an EventEmitter and removing all callbacks
if (this._client.Utils.isNil(event) && this._client.Utils.isNil(callback)) {
return;
}

this._lifecycleEvents.off(event, callback);
}

offAll(): void {
// we don't require any specific channel mode to be set to call this public method
this._lifecycleEvents.off();
}

/**
Expand All @@ -102,7 +141,7 @@ export abstract class LiveObject<
}

/**
* Emits the {@link LiveObjectEvents.Updated} event with provided update object if it isn't a noop.
* Emits the {@link LiveObjectSubscriptionEvent.updated} event with provided update object if it isn't a noop.
*
* @internal
*/
Expand All @@ -112,7 +151,7 @@ export abstract class LiveObject<
return;
}

this._eventEmitter.emit(LiveObjectEvents.Updated, update);
this._subscriptions.emit(LiveObjectSubscriptionEvent.updated, update);
}

/**
Expand All @@ -124,7 +163,7 @@ export abstract class LiveObject<
this._tombstone = true;
this._tombstonedAt = Date.now();
this._dataRef = this._getZeroValueData();
// TODO: emit "deleted" event so that end users get notified about this object getting deleted
this._lifecycleEvents.emit(LiveObjectLifecycleEvent.deleted);
}

/**
Expand Down
123 changes: 101 additions & 22 deletions src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,42 @@ import { LiveObjectsPool, ROOT_OBJECT_ID } from './liveobjectspool';
import { StateMessage, StateOperationAction } from './statemessage';
import { SyncLiveObjectsDataPool } from './syncliveobjectsdatapool';

enum LiveObjectsEvents {
SyncCompleted = 'SyncCompleted',
export enum LiveObjectsEvent {
syncing = 'syncing',
synced = 'synced',
}

type BatchCallback = (batchContext: BatchContext) => void;
export enum LiveObjectsState {
initialized = 'initialized',
syncing = 'syncing',
synced = 'synced',
}

const StateToEventsMap: Record<LiveObjectsState, LiveObjectsEvent | undefined> = {
initialized: undefined,
syncing: LiveObjectsEvent.syncing,
synced: LiveObjectsEvent.synced,
};

export type LiveObjectsEventCallback = () => void;

export interface OnLiveObjectsEventResponse {
off(): void;
}

export type BatchCallback = (batchContext: BatchContext) => void;

export class LiveObjects {
private _client: BaseClient;
private _channel: RealtimeChannel;
private _state: LiveObjectsState;
// composition over inheritance since we cannot import class directly into plugin code.
// instead we obtain a class type from the client
private _eventEmitter: EventEmitter;
private _eventEmitterInternal: EventEmitter;
// related to RTC10, should have a separate EventEmitter for users of the library
private _eventEmitterPublic: EventEmitter;
private _liveObjectsPool: LiveObjectsPool;
private _syncLiveObjectsDataPool: SyncLiveObjectsDataPool;
private _syncInProgress: boolean;
private _currentSyncId: string | undefined;
private _currentSyncCursor: string | undefined;
private _bufferedStateOperations: StateMessage[];
Expand All @@ -36,10 +57,11 @@ export class LiveObjects {
constructor(channel: RealtimeChannel) {
this._channel = channel;
this._client = channel.client;
this._eventEmitter = new this._client.EventEmitter(this._client.logger);
this._state = LiveObjectsState.initialized;
this._eventEmitterInternal = new this._client.EventEmitter(this._client.logger);
this._eventEmitterPublic = new this._client.EventEmitter(this._client.logger);
this._liveObjectsPool = new LiveObjectsPool(this);
this._syncLiveObjectsDataPool = new SyncLiveObjectsDataPool(this);
this._syncInProgress = true;
this._bufferedStateOperations = [];
}

Expand All @@ -51,9 +73,9 @@ export class LiveObjects {
async getRoot<T extends API.LiveMapType = API.DefaultRoot>(): Promise<LiveMap<T>> {
this.throwIfMissingStateSubscribeMode();

// SYNC is currently in progress, wait for SYNC sequence to finish
if (this._syncInProgress) {
await this._eventEmitter.once(LiveObjectsEvents.SyncCompleted);
// if we're not synced yet, wait for SYNC sequence to finish before returning root
if (this._state !== LiveObjectsState.synced) {
await this._eventEmitterInternal.once(LiveObjectsEvent.synced);
}

return this._liveObjectsPool.get(ROOT_OBJECT_ID) as LiveMap<T>;
Expand Down Expand Up @@ -141,6 +163,33 @@ export class LiveObjects {
return counter;
}

on(event: LiveObjectsEvent, callback: LiveObjectsEventCallback): OnLiveObjectsEventResponse {
// we don't require any specific channel mode to be set to call this public method
this._eventEmitterPublic.on(event, callback);

const off = () => {
this._eventEmitterPublic.off(event, callback);
};

return { off };
}

off(event: LiveObjectsEvent, callback: LiveObjectsEventCallback): void {
// we don't require any specific channel mode to be set to call this public method

// prevent accidentally calling .off without any arguments on an EventEmitter and removing all callbacks
if (this._client.Utils.isNil(event) && this._client.Utils.isNil(callback)) {
return;
}

this._eventEmitterPublic.off(event, callback);
}

offAll(): void {
// we don't require any specific channel mode to be set to call this public method
this._eventEmitterPublic.off();
}

/**
* @internal
*/
Expand All @@ -167,23 +216,26 @@ export class LiveObjects {
*/
handleStateSyncMessages(stateMessages: StateMessage[], syncChannelSerial: string | null | undefined): void {
const { syncId, syncCursor } = this._parseSyncChannelSerial(syncChannelSerial);
if (this._currentSyncId !== syncId) {
const newSyncSequence = this._currentSyncId !== syncId;
if (newSyncSequence) {
this._startNewSync(syncId, syncCursor);
}

this._syncLiveObjectsDataPool.applyStateSyncMessages(stateMessages);

// if this is the last (or only) message in a sequence of sync updates, end the sync
if (!syncCursor) {
this._endSync();
// defer the state change event until the next tick if this was a new sync sequence
// to allow any event listeners to process the start of the new sequence event that was emitted earlier during this event loop.
this._endSync(newSyncSequence);
}
}

/**
* @internal
*/
handleStateMessages(stateMessages: StateMessage[]): void {
if (this._syncInProgress) {
if (this._state !== LiveObjectsState.synced) {
// The client receives state messages in realtime over the channel concurrently with the SYNC sequence.
// Some of the incoming state messages may have already been applied to the state objects described in
// the SYNC sequence, but others may not; therefore we must buffer these messages so that we can apply
Expand All @@ -206,14 +258,20 @@ export class LiveObjects {
`channel=${this._channel.name}, hasState=${hasState}`,
);

if (hasState) {
const fromInitializedState = this._state === LiveObjectsState.initialized;
if (hasState || fromInitializedState) {
// should always start a new sync sequence if we're in the initialized state, no matter the HAS_STATE flag value.
// this guarantees we emit both "syncing" -> "synced" events in that order.
this._startNewSync();
} else {
// no HAS_STATE flag received on attach, can end SYNC sequence immediately
// and treat it as no state on a channel
}

if (!hasState) {
// if no HAS_STATE flag received on attach, we can end SYNC sequence immediately and treat it as no state on a channel.
this._liveObjectsPool.reset();
this._syncLiveObjectsDataPool.reset();
this._endSync();
// defer the state change event until the next tick if we started a new sequence just now due to being in initialized state.
// this allows any event listeners to process the start of the new sequence event that was emitted earlier during this event loop.
this._endSync(fromInitializedState);
}
}

Expand Down Expand Up @@ -274,10 +332,10 @@ export class LiveObjects {
this._syncLiveObjectsDataPool.reset();
this._currentSyncId = syncId;
this._currentSyncCursor = syncCursor;
this._syncInProgress = true;
this._stateChange(LiveObjectsState.syncing, false);
}

private _endSync(): void {
private _endSync(deferStateEvent: boolean): void {
this._applySync();
// should apply buffered state operations after we applied the SYNC data.
// can use regular state messages application logic
Expand All @@ -287,8 +345,7 @@ export class LiveObjects {
this._syncLiveObjectsDataPool.reset();
this._currentSyncId = undefined;
this._currentSyncCursor = undefined;
this._syncInProgress = false;
this._eventEmitter.emit(LiveObjectsEvents.SyncCompleted);
this._stateChange(LiveObjectsState.synced, deferStateEvent);
}

private _parseSyncChannelSerial(syncChannelSerial: string | null | undefined): {
Expand Down Expand Up @@ -407,4 +464,26 @@ export class LiveObjects {
throw new this._client.ErrorInfo(`"${expectedMode}" channel mode must be set for this operation`, 40160, 400);
}
}

private _stateChange(state: LiveObjectsState, deferEvent: boolean): void {
if (this._state === state) {
return;
}

this._state = state;
const event = StateToEventsMap[state];
if (!event) {
return;
}

if (deferEvent) {
this._client.Platform.Config.nextTick(() => {
this._eventEmitterInternal.emit(event);
this._eventEmitterPublic.emit(event);
});
} else {
this._eventEmitterInternal.emit(event);
this._eventEmitterPublic.emit(event);
}
}
}
Loading