Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DTP-1138] Add object-level write API for LiveMap/LiveCounter creation #1950

Draft
wants to merge 14 commits into
base: liveobjects/object-mutation-api
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scripts/moduleReport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { gzip } from 'zlib';
import Table from 'cli-table';

// The maximum size we allow for a minimal useful Realtime bundle (i.e. one that can subscribe to a channel)
const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 101, gzip: 31 };
const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 102, gzip: 31 };

const baseClientNames = ['BaseRest', 'BaseRealtime'];

Expand Down
33 changes: 9 additions & 24 deletions src/common/lib/client/auth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ class Auth {
capability = tokenParams.capability || '';

if (!request.timestamp) {
request.timestamp = await this.getTimestamp(authOptions && authOptions.queryTime);
request.timestamp = await this._getTimestamp(authOptions && authOptions.queryTime);
}

/* nonce */
Expand Down Expand Up @@ -832,28 +832,6 @@ class Auth {
}
}

/**
* Get the current time based on the local clock,
* or if the option queryTime is true, return the server time.
* The server time offset from the local time is stored so that
* only one request to the server to get the time is ever needed
*/
async getTimestamp(queryTime: boolean): Promise<number> {
if (!this.isTimeOffsetSet() && (queryTime || this.authOptions.queryTime)) {
return this.client.time();
} else {
return this.getTimestampUsingOffset();
}
}

getTimestampUsingOffset() {
return Date.now() + (this.client.serverTimeOffset || 0);
}

isTimeOffsetSet() {
return this.client.serverTimeOffset !== null;
}

_saveBasicOptions(authOptions: AuthOptions) {
this.method = 'basic';
this.key = authOptions.key;
Expand Down Expand Up @@ -913,7 +891,7 @@ class Auth {
/* RSA4b1 -- if we have a server time offset set already, we can
* automatically remove expired tokens. Else just use the cached token. If it is
* expired Ably will tell us and we'll discard it then. */
if (!this.isTimeOffsetSet() || !token.expires || token.expires >= this.getTimestampUsingOffset()) {
if (!this.client.isTimeOffsetSet() || !token.expires || token.expires >= this.client.getTimestampUsingOffset()) {
Logger.logAction(
this.logger,
Logger.LOG_MINOR,
Expand Down Expand Up @@ -1020,6 +998,13 @@ class Auth {
): Promise<TokenRevocationResult> {
return this.client.rest.revokeTokens(specifiers, options);
}

/**
* Same as {@link BaseClient.getTimestamp} but also takes into account {@link Auth.authOptions}
*/
private async _getTimestamp(queryTime: boolean): Promise<number> {
return this.client.getTimestamp(queryTime || !!this.authOptions.queryTime);
}
}

export default Auth;
22 changes: 22 additions & 0 deletions src/common/lib/client/baseclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,28 @@ class BaseClient {
this.logger.setLog(logOptions.level, logOptions.handler);
}

/**
* Get the current time based on the local clock,
* or if the option queryTime is true, return the server time.
* The server time offset from the local time is stored so that
* only one request to the server to get the time is ever needed
*/
async getTimestamp(queryTime: boolean): Promise<number> {
if (!this.isTimeOffsetSet() && queryTime) {
return this.time();
}

return this.getTimestampUsingOffset();
}

getTimestampUsingOffset(): number {
return Date.now() + (this.serverTimeOffset || 0);
}

isTimeOffsetSet(): boolean {
return this.serverTimeOffset !== null;
}

static Platform = Platform;

/**
Expand Down
2 changes: 2 additions & 0 deletions src/common/types/IBufferUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export default interface IBufferUtils<Bufferlike, Output, ToBufferOutput> {
toBuffer: (buffer: Bufferlike) => ToBufferOutput;
toArrayBuffer: (buffer: Bufferlike) => ArrayBuffer;
base64Encode: (buffer: Bufferlike) => string;
base64UrlEncode: (buffer: Bufferlike) => string;
base64Decode: (string: string) => Output;
hexEncode: (buffer: Bufferlike) => string;
hexDecode: (string: string) => Output;
Expand All @@ -19,5 +20,6 @@ export default interface IBufferUtils<Bufferlike, Output, ToBufferOutput> {
* Returns ArrayBuffer on browser and Buffer on Node.js
*/
arrayBufferViewToBuffer: (arrayBufferView: ArrayBufferView) => Bufferlike;
sha256(message: Bufferlike): Output;
hmacSha256(message: Bufferlike, key: Bufferlike): Output;
}
15 changes: 11 additions & 4 deletions src/platform/nodejs/lib/util/bufferutils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ class BufferUtils implements IBufferUtils<Bufferlike, Output, ToBufferOutput> {
return this.toBuffer(buffer).toString('base64');
}

base64UrlEncode(buffer: Bufferlike): string {
return this.toBuffer(buffer).toString('base64url');
}

areBuffersEqual(buffer1: Bufferlike, buffer2: Bufferlike): boolean {
if (!buffer1 || !buffer2) return false;
return this.toBuffer(buffer1).compare(this.toBuffer(buffer2)) == 0;
Expand Down Expand Up @@ -70,14 +74,17 @@ class BufferUtils implements IBufferUtils<Bufferlike, Output, ToBufferOutput> {
return Buffer.from(string, 'utf8');
}

sha256(message: Bufferlike): Output {
const messageBuffer = this.toBuffer(message);

return crypto.createHash('SHA256').update(messageBuffer).digest();
}

hmacSha256(message: Bufferlike, key: Bufferlike): Output {
const messageBuffer = this.toBuffer(message);
const keyBuffer = this.toBuffer(key);

const hmac = crypto.createHmac('SHA256', keyBuffer);
hmac.update(messageBuffer);

return hmac.digest();
return crypto.createHmac('SHA256', keyBuffer).update(messageBuffer).digest();
}
}

Expand Down
12 changes: 11 additions & 1 deletion src/platform/web/lib/util/bufferutils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Platform from 'common/platform';
import IBufferUtils from 'common/types/IBufferUtils';
import { hmac as hmacSha256 } from './hmac-sha256';
import { hmac as hmacSha256, sha256 } from './hmac-sha256';

/* Most BufferUtils methods that return a binary object return an ArrayBuffer
* The exception is toBuffer, which returns a Uint8Array */
Expand Down Expand Up @@ -116,6 +116,11 @@ class BufferUtils implements IBufferUtils<Bufferlike, Output, ToBufferOutput> {
return this.uint8ViewToBase64(this.toBuffer(buffer));
}

base64UrlEncode(buffer: Bufferlike): string {
// base64url encoding is based on regular base64 with following changes: https://base64.guru/standards/base64url
return this.base64Encode(buffer).replace(/\+/g, '-').replace(/\//g, '_').replace(/=+$/, '');
}

base64Decode(str: string): Output {
if (ArrayBuffer && Platform.Config.atob) {
return this.base64ToArrayBuffer(str);
Expand Down Expand Up @@ -195,6 +200,11 @@ class BufferUtils implements IBufferUtils<Bufferlike, Output, ToBufferOutput> {
return this.toArrayBuffer(arrayBufferView);
}

sha256(message: Bufferlike): Output {
const hash = sha256(this.toBuffer(message));
return this.toArrayBuffer(hash);
}

hmacSha256(message: Bufferlike, key: Bufferlike): Output {
const hash = hmacSha256(this.toBuffer(key), this.toBuffer(message));
return this.toArrayBuffer(hash);
Expand Down
4 changes: 2 additions & 2 deletions src/platform/web/lib/util/hmac-sha256.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ function rightRotate(word: number, bits: number) {
return (word >>> bits) | (word << (32 - bits));
}

function sha256(data: Uint8Array) {
export function sha256(data: Uint8Array): Uint8Array {
// Copy default state
var STATE = DEFAULT_STATE.slice();

Expand Down Expand Up @@ -185,7 +185,7 @@ function sha256(data: Uint8Array) {
);
}

export function hmac(key: Uint8Array, data: Uint8Array) {
export function hmac(key: Uint8Array, data: Uint8Array): Uint8Array {
if (key.length > 64) key = sha256(key);

if (key.length < 64) {
Expand Down
105 changes: 86 additions & 19 deletions src/plugins/liveobjects/livecounter.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject';
import { LiveObjects } from './liveobjects';
import { ObjectId } from './objectid';
import { StateCounterOp, StateMessage, StateObject, StateOperation, StateOperationAction } from './statemessage';

export interface LiveCounterData extends LiveObjectData {
Expand Down Expand Up @@ -32,47 +33,113 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
return obj;
}

value(): number {
return this._dataRef.data;
}

/**
* Send a COUNTER_INC operation to the realtime system to increment a value on this LiveCounter object.
*
* This does not modify the underlying data of this LiveCounter object. Instead, the change will be applied when
* the published COUNTER_INC operation is echoed back to the client and applied to the object following the regular
* operation application procedure.
* Returns a {@link LiveCounter} instance based on the provided state operation.
* The provided state operation must hold a valid counter object data.
*
* @returns A promise which resolves upon receiving the ACK message for the published operation message.
* @internal
*/
async increment(amount: number): Promise<void> {
const stateMessage = this.createCounterIncMessage(amount);
return this._liveObjects.publish([stateMessage]);
static fromStateOperation(liveobjects: LiveObjects, stateOperation: StateOperation): LiveCounter {
const obj = new LiveCounter(liveobjects, stateOperation.objectId);
obj._mergeInitialDataFromCreateOperation(stateOperation);
return obj;
}

/**
* @internal
*/
createCounterIncMessage(amount: number): StateMessage {
static createCounterIncMessage(liveObjects: LiveObjects, objectId: string, amount: number): StateMessage {
const client = liveObjects.getClient();

if (typeof amount !== 'number' || !isFinite(amount)) {
throw new this._client.ErrorInfo('Counter value increment should be a valid number', 40013, 400);
throw new client.ErrorInfo('Counter value increment should be a valid number', 40013, 400);
}

const stateMessage = StateMessage.fromValues(
{
operation: {
action: StateOperationAction.COUNTER_INC,
objectId: this.getObjectId(),
objectId,
counterOp: { amount },
},
},
this._client.Utils,
this._client.MessageEncoding,
client.Utils,
client.MessageEncoding,
);

return stateMessage;
}

/**
* @internal
*/
static async createCounterCreateMessage(liveObjects: LiveObjects, count?: number): Promise<StateMessage> {
const client = liveObjects.getClient();

if (count !== undefined && (typeof count !== 'number' || !Number.isFinite(count))) {
throw new client.ErrorInfo('Counter value should be a valid number', 40013, 400);
}

const initialValueObj = LiveCounter.createInitialValueObject(count);
const { encodedInitialValue, format } = StateMessage.encodeInitialValue(client.Utils, initialValueObj);
const nonce = client.Utils.cheapRandStr();
const msTimestamp = await client.getTimestamp(true);

const objectId = ObjectId.fromInitialValue(
client.Platform,
'counter',
encodedInitialValue,
nonce,
msTimestamp,
).toString();

const stateMessage = StateMessage.fromValues(
{
operation: {
...initialValueObj,
action: StateOperationAction.COUNTER_CREATE,
objectId,
nonce,
initialValue: encodedInitialValue,
initialValueEncoding: format,
},
},
client.Utils,
client.MessageEncoding,
);

return stateMessage;
}

/**
* @internal
*/
static createInitialValueObject(count?: number): Pick<StateOperation, 'counter'> {
return {
counter: {
count: count ?? 0,
},
};
}

value(): number {
return this._dataRef.data;
}

/**
* Send a COUNTER_INC operation to the realtime system to increment a value on this LiveCounter object.
*
* This does not modify the underlying data of this LiveCounter object. Instead, the change will be applied when
* the published COUNTER_INC operation is echoed back to the client and applied to the object following the regular
* operation application procedure.
*
* @returns A promise which resolves upon receiving the ACK message for the published operation message.
*/
async increment(amount: number): Promise<void> {
const stateMessage = LiveCounter.createCounterIncMessage(this._liveObjects, this.getObjectId(), amount);
return this._liveObjects.publish([stateMessage]);
}

/**
* Alias for calling {@link LiveCounter.increment | LiveCounter.increment(-amount)}
*/
Expand Down Expand Up @@ -185,7 +252,7 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
this._siteTimeserials = stateObject.siteTimeserials ?? {};

if (this.isTombstoned()) {
// this object is tombstoned. this is a terminal state which can't be overriden. skip the rest of state object message processing
// this object is tombstoned. this is a terminal state which can't be overridden. skip the rest of state object message processing
return { noop: true };
}

Expand Down
Loading
Loading