Skip to content

Commit

Permalink
Add Batch write API for LiveObjects
Browse files Browse the repository at this point in the history
Resolves DTP-1035
  • Loading branch information
VeskeR committed Jan 23, 2025
1 parent 56d62a8 commit 45c7863
Show file tree
Hide file tree
Showing 6 changed files with 482 additions and 1 deletion.
3 changes: 3 additions & 0 deletions scripts/moduleReport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@ async function checkLiveObjectsPluginFiles() {

// These are the files that are allowed to contribute >= `threshold` bytes to the LiveObjects bundle.
const allowedFiles = new Set([
'src/plugins/liveobjects/batchcontext.ts',
'src/plugins/liveobjects/batchcontextlivecounter.ts',
'src/plugins/liveobjects/batchcontextlivemap.ts',
'src/plugins/liveobjects/index.ts',
'src/plugins/liveobjects/livecounter.ts',
'src/plugins/liveobjects/livemap.ts',
Expand Down
106 changes: 106 additions & 0 deletions src/plugins/liveobjects/batchcontext.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import type BaseClient from 'common/lib/client/baseclient';
import type * as API from '../../../ably';
import { BatchContextLiveCounter } from './batchcontextlivecounter';
import { BatchContextLiveMap } from './batchcontextlivemap';
import { LiveCounter } from './livecounter';
import { LiveMap } from './livemap';
import { LiveObjects } from './liveobjects';
import { ROOT_OBJECT_ID } from './liveobjectspool';
import { StateMessage } from './statemessage';

export class BatchContext {
private _client: BaseClient;
/** Maps object ids to the corresponding batch context object wrappers for Live Objects in the pool */
private _wrappedObjects: Map<string, BatchContextLiveCounter | BatchContextLiveMap<API.LiveMapType>> = new Map();
private _queuedMessages: StateMessage[] = [];
private _isClosed = false;

constructor(
private _liveObjects: LiveObjects,
private _root: LiveMap<API.LiveMapType>,
) {
this._client = _liveObjects.getClient();
this._wrappedObjects.set(this._root.getObjectId(), new BatchContextLiveMap(this, this._liveObjects, this._root));
}

getRoot<T extends API.LiveMapType = API.DefaultRoot>(): BatchContextLiveMap<T> {
this.throwIfClosed();
return this.getWrappedObject(ROOT_OBJECT_ID) as BatchContextLiveMap<T>;
}

/**
* @internal
*/
getWrappedObject(objectId: string): BatchContextLiveCounter | BatchContextLiveMap<API.LiveMapType> | undefined {
if (this._wrappedObjects.has(objectId)) {
return this._wrappedObjects.get(objectId);
}

const originObject = this._liveObjects.getPool().get(objectId);
if (!originObject) {
return undefined;
}

let wrappedObject: BatchContextLiveCounter | BatchContextLiveMap<API.LiveMapType>;
if (originObject instanceof LiveMap) {
wrappedObject = new BatchContextLiveMap(this, this._liveObjects, originObject);
} else if (originObject instanceof LiveCounter) {
wrappedObject = new BatchContextLiveCounter(this, this._liveObjects, originObject);
} else {
throw new this._client.ErrorInfo(
`Unknown Live Object instance type: objectId=${originObject.getObjectId()}`,
50000,
500,
);
}

this._wrappedObjects.set(objectId, wrappedObject);
return wrappedObject;
}

/**
* @internal
*/
throwIfClosed(): void {
if (this.isClosed()) {
throw new this._client.ErrorInfo('Batch is closed', 40000, 400);
}
}

/**
* @internal
*/
isClosed(): boolean {
return this._isClosed;
}

/**
* @internal
*/
close(): void {
this._isClosed = true;
}

/**
* @internal
*/
queueStateMessage(stateMessage: StateMessage): void {
this._queuedMessages.push(stateMessage);
}

/**
* @internal
*/
async flush(): Promise<void> {
try {
this.close();

if (this._queuedMessages.length > 0) {
await this._liveObjects.publish(this._queuedMessages);
}
} finally {
this._wrappedObjects.clear();
this._queuedMessages = [];
}
}
}
38 changes: 38 additions & 0 deletions src/plugins/liveobjects/batchcontextlivecounter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import type BaseClient from 'common/lib/client/baseclient';
import { BatchContext } from './batchcontext';
import { LiveCounter } from './livecounter';
import { LiveObjects } from './liveobjects';

export class BatchContextLiveCounter {
private _client: BaseClient;

constructor(
private _batchContext: BatchContext,
private _liveObjects: LiveObjects,
private _counter: LiveCounter,
) {
this._client = this._liveObjects.getClient();
}

value(): number {
this._batchContext.throwIfClosed();
return this._counter.value();
}

increment(amount: number): void {
this._batchContext.throwIfClosed();
const stateMessage = LiveCounter.createCounterIncMessage(this._liveObjects, this._counter.getObjectId(), amount);
this._batchContext.queueStateMessage(stateMessage);
}

decrement(amount: number): void {
this._batchContext.throwIfClosed();
// do an explicit type safety check here before negating the amount value,
// so we don't unintentionally change the type sent by a user
if (typeof amount !== 'number') {
throw new this._client.ErrorInfo('Counter value decrement should be a number', 40013, 400);
}

this.increment(-amount);
}
}
40 changes: 40 additions & 0 deletions src/plugins/liveobjects/batchcontextlivemap.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import type * as API from '../../../ably';
import { BatchContext } from './batchcontext';
import { LiveMap } from './livemap';
import { LiveObject } from './liveobject';
import { LiveObjects } from './liveobjects';

export class BatchContextLiveMap<T extends API.LiveMapType> {
constructor(
private _batchContext: BatchContext,
private _liveObjects: LiveObjects,
private _map: LiveMap<T>,
) {}

get<TKey extends keyof T & string>(key: TKey): T[TKey] | undefined {
this._batchContext.throwIfClosed();
const value = this._map.get(key);
if (value instanceof LiveObject) {
return this._batchContext.getWrappedObject(value.getObjectId()) as T[TKey];
} else {
return value;
}
}

size(): number {
this._batchContext.throwIfClosed();
return this._map.size();
}

set<TKey extends keyof T & string>(key: TKey, value: T[TKey]): void {
this._batchContext.throwIfClosed();
const stateMessage = LiveMap.createMapSetMessage(this._liveObjects, this._map.getObjectId(), key, value);
this._batchContext.queueStateMessage(stateMessage);
}

remove<TKey extends keyof T & string>(key: TKey): void {
this._batchContext.throwIfClosed();
const stateMessage = LiveMap.createMapRemoveMessage(this._liveObjects, this._map.getObjectId(), key);
this._batchContext.queueStateMessage(stateMessage);
}
}
20 changes: 19 additions & 1 deletion src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type BaseClient from 'common/lib/client/baseclient';
import type RealtimeChannel from 'common/lib/client/realtimechannel';
import type EventEmitter from 'common/lib/util/eventemitter';
import type * as API from '../../../ably';
import { BatchContext } from './batchcontext';
import { DEFAULTS } from './defaults';
import { LiveCounter } from './livecounter';
import { LiveMap } from './livemap';
Expand All @@ -14,6 +15,8 @@ enum LiveObjectsEvents {
SyncCompleted = 'SyncCompleted',
}

type BatchCallback = (batchContext: BatchContext) => void;

export class LiveObjects {
private _client: BaseClient;
private _channel: RealtimeChannel;
Expand Down Expand Up @@ -54,6 +57,21 @@ export class LiveObjects {
return this._liveObjectsPool.get(ROOT_OBJECT_ID) as LiveMap<T>;
}

/**
* Provides access to the synchronous write API for LiveObjects that can be used to batch multiple operations together in a single channel message.
*/
async batch(callback: BatchCallback): Promise<void> {
const root = await this.getRoot();
const context = new BatchContext(this, root);

try {
callback(context);
await context.flush();
} finally {
context.close();
}
}

/**
* Send a MAP_CREATE operation to the realtime system to create a new map object in the pool.
*
Expand Down Expand Up @@ -301,7 +319,7 @@ export class LiveObjects {
break;

default:
throw new this._client.ErrorInfo(`Unknown live object type: ${objectType}`, 50000, 500);
throw new this._client.ErrorInfo(`Unknown Live Object type: ${objectType}`, 50000, 500);
}

this._liveObjectsPool.set(objectId, newObject);
Expand Down
Loading

0 comments on commit 45c7863

Please sign in to comment.