Skip to content

Commit

Permalink
Add durable and lock options (#100)
Browse files Browse the repository at this point in the history
* Add durable option

If this option is set to false, an execution will not be
represented by a durable promise. Defaults to true.

* Add lock and additional options

* Allow tryAcquire to override lock expiry

Additionally, constrain the expiry to be the maximum of the
provided value and heartbeat frequency.

* Fix locks and override invocation timeout with promise timeout

* Decouple durable and lock options
  • Loading branch information
dfarr authored Apr 12, 2024
1 parent 448efaa commit d245ad6
Show file tree
Hide file tree
Showing 16 changed files with 704 additions and 228 deletions.
53 changes: 31 additions & 22 deletions lib/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import { Execution, OrdinaryExecution, DeferredExecution } from "./core/executio
import { ResonatePromise } from "./core/future";
import { Invocation } from "./core/invocation";
import { ResonateOptions, Options, PartialOptions } from "./core/options";
import { DurablePromise } from "./core/promises/promises";
import { Retry } from "./core/retries/retry";
import * as schedules from "./core/schedules/schedules";
import * as utils from "./core/utils";
import { ResonateBase } from "./resonate";

/////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -35,6 +35,18 @@ export class Resonate extends ResonateBase {
this.scheduler = new Scheduler(this);
}

protected execute<F extends Func>(
name: string,
id: string,
func: F,
args: Params<F>,
opts: Options,
defaults: Options,
durablePromise?: DurablePromise<any>,
): ResonatePromise<Return<F>> {
return this.scheduler.add(name, id, func, args, opts, defaults, durablePromise);
}

/**
* Register a function with Resonate. Registered functions can be invoked by calling {@link run}, or by the returned function.
*
Expand Down Expand Up @@ -93,17 +105,6 @@ export class Resonate extends ResonateBase {
schedule(name: string, cron: string, func: Func | string, ...args: any[]): Promise<schedules.Schedule> {
return super.schedule(name, cron, func, ...args);
}

protected execute<F extends Func>(
name: string,
id: string,
idempotencyKey: string | undefined,
func: F,
args: Params<F>,
opts: Options,
): ResonatePromise<Return<F>> {
return this.scheduler.add(name, id, idempotencyKey, func, args, opts);
}
}

/////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -144,6 +145,13 @@ export class Context {
return this.invocation.idempotencyKey;
}

/**
* All configured options for this context.
*/
get opts() {
return this.invocation.opts;
}

/**
* The timestamp in ms, once this time elapses the function invocation will timeout.
*/
Expand Down Expand Up @@ -200,28 +208,28 @@ export class Context {
// human readable name of the function
const name = typeof func === "string" ? func : func.name;

// the idempotency key is a hash of the id
const idempotencyKey = utils.hash(id);

// opts are optional and can be provided as the last arg
const { args, opts } = this.invocation.split(argsWithOpts);

// default opts never change
const defaults = this.invocation.defaults;

// param is only required for deferred executions
const param = typeof func === "string" ? args[0] : undefined;

// create a new invocation
const invocation = new Invocation(name, id, idempotencyKey, undefined, param, opts, parent);
const invocation = new Invocation(name, id, undefined, param, opts, defaults, parent);

let execution: Execution<any>;
if (typeof func === "string") {
// create a deferred execution
// this execution will be fulfilled out-of-process
execution = new DeferredExecution(invocation);
execution = new DeferredExecution(this.resonate, invocation);
} else {
// create an ordinary execution
// this execution wraps a user-provided function
const ctx = new Context(this.resonate, invocation);
execution = new OrdinaryExecution(invocation, () => func(ctx, ...args));
execution = new OrdinaryExecution(this.resonate, invocation, () => func(ctx, ...args));
}

// bump the counter
Expand Down Expand Up @@ -391,14 +399,15 @@ class Scheduler {
add<F extends Func>(
name: string,
id: string,
idempotencyKey: string | undefined,
func: F,
args: Params<F>,
opts: Options,
defaults: Options,
durablePromise?: DurablePromise<any>,
): ResonatePromise<Return<F>> {
// if the execution is already running, and not killed,
// return the promise
if (this.cache[id] && !this.cache[id].killed) {
if (opts.durable && this.cache[id] && !this.cache[id].killed) {
// execute is idempotent
return this.cache[id].execute();
}
Expand All @@ -411,11 +420,11 @@ class Scheduler {
};

// create a new invocation
const invocation = new Invocation<Return<F>>(name, id, idempotencyKey, undefined, param, opts);
const invocation = new Invocation<Return<F>>(name, id, undefined, param, opts, defaults);

// create a new execution
const ctx = new Context(this.resonate, invocation);
const execution = new OrdinaryExecution(invocation, () => func(ctx, ...args));
const execution = new OrdinaryExecution(this.resonate, invocation, () => func(ctx, ...args), durablePromise);

// store the execution,
// will be used if run is called again with the same id
Expand Down
Loading

0 comments on commit d245ad6

Please sign in to comment.