Skip to content

Commit

Permalink
Reimplement recovery path and add function versioning (#69)
Browse files Browse the repository at this point in the history
* Reimplement recovery path and add function versioning

* Add docstring comments

* Add version test
  • Loading branch information
dfarr authored Mar 21, 2024
1 parent 6dc3579 commit 84def39
Show file tree
Hide file tree
Showing 9 changed files with 388 additions and 78 deletions.
2 changes: 2 additions & 0 deletions .eslintrc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ rules:
- vars: all
args: none

require-yield: off

parserOptions:
project: "./tsconfig.json"

Expand Down
72 changes: 59 additions & 13 deletions lib/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,31 @@ export class Resonate extends ResonateBase {
register<F extends AFunc>(
name: string,
func: F,
opts: Partial<Options> = {},
): (id: string, ...args: Params<F>) => ResonatePromise<Return<F>> {
return super.register(name, func, opts);
opts?: Partial<Options>,
): (id: string, ...args: any) => ResonatePromise<Return<F>>;

/**
* Register a function with Resonate. Registered functions can be invoked by calling {@link run}, or by the returned function.
*
* @template F The type of the function.
* @param name A unique name to identify the function.
* @param version Version of the function.
* @param func The function to register with Resonate.
* @param opts Resonate options, can be constructed by calling {@link options}.
* @returns Resonate function
*/
register<F extends AFunc>(
name: string,
version: number,
func: F,
opts?: Partial<Options>,
): (id: string, ...args: any) => ResonatePromise<Return<F>>;
register<F extends AFunc>(
name: string,
funcOrVersion: F | number,
funcOrOpts: F | Partial<Options>,
): (id: string, ...args: any) => ResonatePromise<Return<F>> {
return super.register(name, funcOrVersion, funcOrOpts);
}

/**
Expand Down Expand Up @@ -83,6 +105,13 @@ export class Resonate extends ResonateBase {
export class Info {
constructor(private invocation: Invocation<any>) {}

/**
* The running count of function execution attempts.
*/
get attempt() {
return this.invocation.attempt;
}

/**
* Uniquely identifies the function invocation.
*/
Expand All @@ -105,16 +134,23 @@ export class Info {
}

/**
* The running count of function execution attempts.
* The resonate function version.
*/
get attempt() {
return this.invocation.attempt;
get version() {
return this.invocation.version;
}
}

export class Context {
constructor(private invocation: Invocation<any>) {}

/**
* The running count of child function invocations.
*/
get counter() {
return this.invocation.counter;
}

/**
* Uniquely identifies the function invocation.
*/
Expand All @@ -137,10 +173,10 @@ export class Context {
}

/**
* The running count of child function invocations.
* The resonate function version.
*/
get counter() {
return this.invocation.counter;
get version() {
return this.invocation.version;
}

/**
Expand Down Expand Up @@ -176,7 +212,7 @@ export class Context {
const { args, opts } = this.invocation.split(argsWithOpts);

// create a new invocation
const invocation = new Invocation(id, idempotencyKey, opts, this.invocation);
const invocation = new Invocation(id, idempotencyKey, undefined, opts, this.invocation.version, this.invocation);

let execution: Execution<any>;
if (typeof func === "string") {
Expand Down Expand Up @@ -206,13 +242,13 @@ export class Context {
* @param args The function arguments, optionally followed by {@link options}.
* @returns A promise that resolves to the return value of the function.
*/
io<F extends IFunc>(func: F, ...args: [...Params<F>, PartialOptions?]): Promise<Return<F>>;
io<F extends IFunc>(func: F, ...args: [...Params<F>, PartialOptions?]): ResonatePromise<Return<F>>;
io(func: (...args: any[]) => any, ...argsWithOpts: any[]): ResonatePromise<any> {
const id = `${this.invocation.id}.${this.invocation.counter}`;
const idempotencyKey = utils.hash(id);
const { args, opts } = this.invocation.split(argsWithOpts);

const invocation = new Invocation(id, idempotencyKey, opts, this.invocation);
const invocation = new Invocation(id, idempotencyKey, undefined, opts, this.invocation.version, this.invocation);
const info = new Info(invocation);

// create an ordinary execution
Expand Down Expand Up @@ -262,8 +298,18 @@ class Scheduler {
// the idempotency key is a hash of the id
const idempotencyKey = utils.hash(id);

// params, used for recovery
const param = {
func: name,
version,
args,
};

// add an invocation tag
opts.tags["resonate:invocation"] = "true";

// create a new invocation
const invocation = new Invocation<Return<F>>(id, idempotencyKey, opts);
const invocation = new Invocation<Return<F>>(id, idempotencyKey, param, opts, version);

// create a new execution
const ctx = new Context(invocation);
Expand Down
81 changes: 48 additions & 33 deletions lib/core/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,6 @@ export class OrdinaryExecution<T> extends Execution<T> {
super(invocation);
}

private async invoke(): Promise<T> {
let error;

// invoke the function according to the retry policy
for (const delay of this.retry.iterator(this.invocation)) {
try {
await new Promise((resolve) => setTimeout(resolve, delay));
return await this.func();
} catch (e) {
error = e;

// bump the attempt count
this.invocation.attempt++;
}
}

// if all attempts fail throw the last error
throw error;
}

protected async fork() {
try {
// create a durable promise
Expand All @@ -64,22 +44,26 @@ export class OrdinaryExecution<T> extends Execution<T> {
this.invocation.opts.encoder,
this.invocation.id,
this.invocation.timeout,
{ idempotencyKey: this.invocation.idempotencyKey },
{
idempotencyKey: this.invocation.idempotencyKey,
param: this.invocation.param,
tags: this.invocation.opts.tags,
},
);

if (promise.pending) {
// if pending, invoke the function and resolve/reject the durable promise
await this.invoke().then(
await this.run().then(
(v) => promise.resolve(v, { idempotencyKey: this.invocation.idempotencyKey }),
(e) => promise.reject(e, { idempotencyKey: this.invocation.idempotencyKey }),
);
}

// resolve/reject the invocation
if (promise.resolved) {
this.invocation.resolve(promise.value);
this.invocation.resolve(promise.value());
} else if (promise.rejected || promise.canceled || promise.timedout) {
this.invocation.reject(promise.error);
this.invocation.reject(promise.error());
}
} catch (e) {
// if an error occurs, kill the invocation
Expand All @@ -92,6 +76,26 @@ export class OrdinaryExecution<T> extends Execution<T> {
protected async join(future: Future<T>) {
return await future.promise;
}

private async run(): Promise<T> {
let error;

// invoke the function according to the retry policy
for (const delay of this.retry.iterator(this.invocation)) {
try {
await new Promise((resolve) => setTimeout(resolve, delay));
return await this.func();
} catch (e) {
error = e;

// bump the attempt count
this.invocation.attempt++;
}
}

// if all attempts fail throw the last error
throw error;
}
}

export class DeferredExecution<T> extends Execution<T> {
Expand All @@ -107,11 +111,18 @@ export class DeferredExecution<T> extends Execution<T> {
this.invocation.opts.encoder,
this.invocation.id,
this.invocation.timeout,
{ idempotencyKey: this.invocation.idempotencyKey, poll: true },
{
idempotencyKey: this.invocation.idempotencyKey,
param: this.invocation.param,
tags: this.invocation.opts.tags,
poll: true,
},
);

// poll the completion of the durable promise
promise.completed.then((p) => (p.resolved ? this.invocation.resolve(p.value) : this.invocation.reject(p.error)));
promise.completed.then((p) =>
p.resolved ? this.invocation.resolve(p.value()) : this.invocation.reject(p.error()),
);
} catch (e) {
// if an error occurs, kill the invocation
this.invocation.kill(e);
Expand Down Expand Up @@ -141,14 +152,18 @@ export class GeneratorExecution<T> extends Execution<T> {
this.invocation.opts.encoder,
this.invocation.id,
this.invocation.timeout,
{ idempotencyKey: this.invocation.idempotencyKey },
{
idempotencyKey: this.invocation.idempotencyKey,
param: this.invocation.param,
tags: this.invocation.opts.tags,
},
);

// resolve/reject the invocation if already completed
if (promise.resolved) {
this.invocation.resolve(promise.value);
this.invocation.resolve(promise.value());
} else if (promise.rejected || promise.canceled || promise.timedout) {
this.invocation.reject(promise.error);
this.invocation.reject(promise.error());
}
return promise;
} catch (e) {
Expand All @@ -164,9 +179,9 @@ export class GeneratorExecution<T> extends Execution<T> {

// resolve/reject the invocation
if (promise.resolved) {
this.invocation.resolve(promise.value);
this.invocation.resolve(promise.value());
} else if (promise.rejected || promise.canceled || promise.timedout) {
this.invocation.reject(promise.error);
this.invocation.reject(promise.error());
}
} catch (e) {
// if an error occurs, kill the invocation
Expand All @@ -183,9 +198,9 @@ export class GeneratorExecution<T> extends Execution<T> {

// resolve/reject the invocation
if (promise.resolved) {
this.invocation.resolve(promise.value);
this.invocation.resolve(promise.value());
} else if (promise.rejected || promise.canceled || promise.timedout) {
this.invocation.reject(promise.error);
this.invocation.reject(promise.error());
}
} catch (e) {
// if an error occurs, kill the invocation
Expand Down
6 changes: 5 additions & 1 deletion lib/core/invocation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ export class Invocation<T> {
constructor(
public readonly id: string,
public readonly idempotencyKey: string | undefined,
public readonly param: any,
public readonly opts: Options,
public readonly version: number,
parent?: Invocation<any>,
) {
const { future, resolve, reject } = Future.deferred<T>(this);
Expand All @@ -51,6 +53,8 @@ export class Invocation<T> {
return Math.min(this.createdAt + this.opts.timeout, this.parent?.timeout ?? Infinity);
}

// TODO: move to execution

get killed(): boolean {
return this.root._killed;
}
Expand Down Expand Up @@ -81,7 +85,7 @@ export class Invocation<T> {
const parentOpts = this.parent?.opts ?? this.root.opts;

return isPartialOptions(opts)
? { args: args.slice(0, -1), opts: { ...parentOpts, ...opts } }
? { args: args.slice(0, -1), opts: { ...parentOpts, ...opts, tags: { ...parentOpts.tags, ...opts.tags } } }
: { args, opts: parentOpts };
}
}
10 changes: 10 additions & 0 deletions lib/core/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ export interface ResonateOptions {
*/
retry: IRetry;

/**
* Tags to add to all durable promises.
*/
tags: Record<string, string>;

/**
* A store instance, if provided this will take precedence over a
* remote store.
Expand Down Expand Up @@ -71,6 +76,11 @@ export interface Options {
*/
store: IStore;

/**
* Additional tags to add to the durable promise.
*/
tags: Record<string, string>;

/**
* Overrides the default timeout.
*/
Expand Down
29 changes: 23 additions & 6 deletions lib/core/promises/promises.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,16 @@ export class DurablePromise<T> {
return this.promise.state === "REJECTED_TIMEDOUT";
}

get value() {
return this.encoder.decode(this.promise.value.data) as T;
param() {
return this.encoder.decode(this.promise.param.data);
}

get error() {
return this.encoder.decode(this.promise.value.data);
value() {
return this.encoder.decode(this.promise.value.data) as T;
}

static async get<T>(store: IPromiseStore, encoder: IEncoder<unknown, string | undefined>, id: string) {
return new DurablePromise<T>(store, encoder, await store.get(id));
error() {
return this.encoder.decode(this.promise.value.data);
}

static async create<T>(
Expand Down Expand Up @@ -146,6 +146,23 @@ export class DurablePromise<T> {
);
}

static async get<T>(store: IPromiseStore, encoder: IEncoder<unknown, string | undefined>, id: string) {
return new DurablePromise<T>(store, encoder, await store.get(id));
}

static async *search(
store: IPromiseStore,
encoder: IEncoder<unknown, string | undefined>,
id: string,
state?: string,
tags?: Record<string, string>,
limit?: number,
): AsyncGenerator<DurablePromise<any>[], void> {
for await (const promises of store.search(id, state, tags, limit)) {
yield promises.map((p) => new DurablePromise(store, encoder, p));
}
}

async resolve(value: T, opts: Partial<CompleteOptions> = {}) {
this.promise = !this.pending
? this.promise
Expand Down
Loading

0 comments on commit 84def39

Please sign in to comment.