Skip to content

Commit

Permalink
Removes "default" options from the invocation (#125)
Browse files Browse the repository at this point in the history
Default options in the invocation were in reality
the registered options from the top level function call.
This commit changes the code to express that more clearly and
removes the split functions that were a source of confusion.
  • Loading branch information
avillega authored Jun 24, 2024
1 parent cd7d6b7 commit 7bf23ef
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 144 deletions.
31 changes: 22 additions & 9 deletions lib/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { ResonateOptions, Options, PartialOptions } from "./core/options";
import { DurablePromise } from "./core/promises/promises";
import * as retryPolicy from "./core/retry";
import * as schedules from "./core/schedules/schedules";
import * as utils from "./core/utils";
import { ResonateBase } from "./resonate";

/////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -41,10 +42,9 @@ export class Resonate extends ResonateBase {
func: F,
args: Params<F>,
opts: Options,
defaults: Options,
durablePromise?: DurablePromise<any>,
): ResonatePromise<Return<F>> {
return this.scheduler.add(name, id, func, args, opts, defaults, durablePromise);
return this.scheduler.add(name, id, func, args, opts, durablePromise);
}

/**
Expand Down Expand Up @@ -210,22 +210,36 @@ export class Context {
// the id is either:
// 1. a provided string in the case of a deferred execution
// 2. a generated string in the case of an ordinary execution
const id = typeof func === "string" ? func : `${parent.id}.${parent.counter}`;
const id = typeof func === "string" ? func : `${parent.id}.${parent.counter}.${func.name}`;

// human readable name of the function
const name = typeof func === "string" ? func : func.name;

// opts are optional and can be provided as the last arg
const { args, opts } = this.invocation.split(argsWithOpts);
const { args, opts: givenOpts } = utils.split(argsWithOpts);
const registeredOptions = this.resonate.registeredOptions(
this.invocation.root.name,
this.invocation.root.opts.version,
);
const resonateOptions = this.resonate.defaults();

const opts = {
...resonateOptions,
...registeredOptions,
...givenOpts,
};

// Merge the tags
opts.tags = { ...resonateOptions.tags, ...registeredOptions.tags, ...givenOpts.tags };

// default opts never change
const defaults = this.invocation.defaults;
// Default lock is false for children execution
opts.lock = opts.lock ?? false;

// param is only required for deferred executions
const param = typeof func === "string" ? args[0] : undefined;

// create a new invocation
const invocation = new Invocation(name, id, undefined, param, opts, defaults, parent);
const invocation = new Invocation(name, id, undefined, param, opts, parent);

let execution: Execution<any>;
if (typeof func === "string") {
Expand Down Expand Up @@ -434,7 +448,6 @@ class Scheduler {
func: F,
args: Params<F>,
opts: Options,
defaults: Options,
durablePromise?: DurablePromise<any>,
): ResonatePromise<Return<F>> {
// if the execution is already running, and not killed,
Expand All @@ -453,7 +466,7 @@ class Scheduler {
};

// create a new invocation
const invocation = new Invocation<Return<F>>(name, id, undefined, param, opts, defaults);
const invocation = new Invocation<Return<F>>(name, id, undefined, param, opts);

// create a new execution
const ctx = new Context(this.resonate, invocation);
Expand Down
56 changes: 26 additions & 30 deletions lib/core/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,23 +91,21 @@ export class OrdinaryExecution<T> extends Execution<T> {
}

protected async fork() {
if (this.invocation.opts.durable) {
if (this.invocation.opts.durable && !this.durablePromise) {
// if durable, create a durable promise
try {
this.durablePromise =
this.durablePromise ??
(await DurablePromise.create<T>(
this.resonate.store.promises,
this.invocation.opts.encoder,
this.invocation.id,
this.invocation.timeout,
{
idempotencyKey: this.invocation.idempotencyKey,
headers: this.invocation.headers,
param: this.invocation.param,
tags: this.invocation.opts.tags,
},
));
this.durablePromise = await DurablePromise.create<T>(
this.resonate.store.promises,
this.invocation.opts.encoder,
this.invocation.id,
this.invocation.timeout,
{
idempotencyKey: this.invocation.idempotencyKey,
headers: this.invocation.headers,
param: this.invocation.param,
tags: this.invocation.opts.tags,
},
);
} catch (e) {
// if an error occurs, kill the execution
this.kill(e);
Expand Down Expand Up @@ -265,22 +263,20 @@ export class GeneratorExecution<T> extends Execution<T> {

async create() {
try {
if (this.invocation.opts.durable) {
if (this.invocation.opts.durable && !this.durablePromise) {
// create a durable promise
this.durablePromise =
this.durablePromise ??
(await DurablePromise.create<T>(
this.resonate.store.promises,
this.invocation.opts.encoder,
this.invocation.id,
this.invocation.timeout,
{
idempotencyKey: this.invocation.idempotencyKey,
headers: this.invocation.headers,
param: this.invocation.param,
tags: this.invocation.opts.tags,
},
));
this.durablePromise = await DurablePromise.create<T>(
this.resonate.store.promises,
this.invocation.opts.encoder,
this.invocation.id,
this.invocation.timeout,
{
idempotencyKey: this.invocation.idempotencyKey,
headers: this.invocation.headers,
param: this.invocation.param,
tags: this.invocation.opts.tags,
},
);

// resolve/reject the invocation if already completed
if (this.durablePromise.resolved) {
Expand Down
33 changes: 3 additions & 30 deletions lib/core/invocation.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Future } from "./future";
import { Options, PartialOptions, isOptions } from "./options";
import { Options } from "./options";
import { RetryPolicy, exponential } from "./retry";

/////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -44,7 +44,6 @@ export class Invocation<T> {
public readonly headers: Record<string, string> | undefined,
public readonly param: unknown,
public readonly opts: Options,
public readonly defaults: Options,
parent?: Invocation<any>,
) {
// create a future and hold on to its resolvers
Expand All @@ -59,13 +58,12 @@ export class Invocation<T> {
// get the execution id from either:
// - a hard coded string
// - a function that returns a string given the invocation id
this.eid = typeof this.opts.eid === "function" ? this.opts.eid(this.id) : this.opts.eid;
this.eid = this.opts.eidFn(this.id);

// get the idempotency key from either:
// - a hard coded string
// - a function that returns a string given the invocation id
this.idempotencyKey =
typeof this.opts.idempotencyKey === "function" ? this.opts.idempotencyKey(this.id) : this.opts.idempotencyKey;
this.idempotencyKey = this.opts.idempotencyKeyFn(this.id);

// the timeout is the minimum of:
// - the current time plus the user provided relative time
Expand All @@ -89,29 +87,4 @@ export class Invocation<T> {
unblock() {
this.blocked = null;
}

split(args: [...any, PartialOptions?]): { args: any[]; opts: Options } {
let opts = args[args.length - 1];

// merge opts
if (isOptions(opts)) {
args = args.slice(0, -1);
opts = {
...this.defaults,
...opts,
tags: { ...this.defaults.tags, ...opts.tags }, // tags are merged
};
} else {
// copy defaults
opts = { ...this.defaults };
}

// lock is false by default
opts.lock = opts.lock ?? false;

// version cannot be overridden
opts.version = this.defaults.version;

return { args, opts };
}
}
10 changes: 6 additions & 4 deletions lib/core/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,21 @@ export type Options = {
durable: boolean;

/**
* A unique id for this execution, defaults to a random id.
* A function that calculates the id for this execution
* defaults to a random funciton.
*/
eid: string | ((id: string) => string);
eidFn: (id: string) => string;

/**
* Overrides the default encoder.
*/
encoder: IEncoder<unknown, string | undefined>;

/**
* Overrides the default idempotency key.
* Overrides the default funciton to calculate the idempotency key.
* defaults to a variation fnv-1a the hash funciton.
*/
idempotencyKey: string | ((id: string) => string);
idempotencyKeyFn: (id: string) => string;

/**
* Acquire a lock for the execution.
Expand Down
21 changes: 9 additions & 12 deletions lib/core/promises/promises.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,19 +202,16 @@ export class DurablePromise<T> {
timeout: number,
opts: Partial<CreateOptions> = {},
) {
return new DurablePromise<T>(
store,
encoder,
await store.create(
id,
opts.idempotencyKey,
opts.strict ?? false,
opts.headers,
encoder.encode(opts.param),
timeout,
opts.tags,
),
const storedPromise = await store.create(
id,
opts.idempotencyKey,
opts.strict ?? false,
opts.headers,
encoder.encode(opts.param),
timeout,
opts.tags,
);
return new DurablePromise<T>(store, encoder, storedPromise);
}

/**
Expand Down
9 changes: 9 additions & 0 deletions lib/core/utils.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { Options, isOptions } from "./options";

export function randomId(): string {
return Math.floor(Math.random() * Number.MAX_SAFE_INTEGER).toString(16);
}
Expand All @@ -13,3 +15,10 @@ export function hash(s: string): string {
const maxLength = 8;
return "0".repeat(Math.max(0, maxLength - hashString.length)) + hashString;
}

export function split(argsWithOpts: any[]): { args: any[]; opts: Partial<Options> } {
const possibleOpts = argsWithOpts.at(-1);
return isOptions(possibleOpts)
? { args: argsWithOpts.slice(0, -1), opts: possibleOpts }
: { args: argsWithOpts, opts: {} };
}
25 changes: 13 additions & 12 deletions lib/generator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Invocation } from "./core/invocation";
import { ResonateOptions, Options, PartialOptions } from "./core/options";
import { DurablePromise } from "./core/promises/promises";
import * as schedules from "./core/schedules/schedules";
import * as utils from "./core/utils";
import { ResonateBase } from "./resonate";

/////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -78,10 +79,9 @@ export class Resonate extends ResonateBase {
func: F,
args: Params<F>,
opts: Options,
defaults: Options,
durablePromise?: DurablePromise<any>,
): ResonatePromise<Return<F>> {
return this.scheduler.add(name, id, func, args, opts, defaults, durablePromise);
return this.scheduler.add(name, id, func, args, opts, durablePromise);
}

/**
Expand Down Expand Up @@ -195,7 +195,10 @@ export class Info {
}

export class Context {
constructor(private invocation: Invocation<any>) {}
constructor(
private resonate: Resonate,
private invocation: Invocation<any>,
) {}

/**
* The running count of child function invocations.
Expand Down Expand Up @@ -313,7 +316,9 @@ export class Context {
}

private _call(func: string | ((...args: any[]) => any), argsWithOpts: any[], yieldFuture: boolean): Call {
const { args, opts } = this.invocation.split(argsWithOpts);
const { args, opts: givenOpts } = utils.split(argsWithOpts);

const opts = this.resonate.defaults(givenOpts);

if (typeof func === "string") {
return { kind: "call", value: { kind: "deferred", func, args, opts }, yieldFuture };
Expand Down Expand Up @@ -366,7 +371,6 @@ class Scheduler {
func: F,
args: Params<F>,
opts: Options,
defaults: Options,
durablePromise?: DurablePromise<any>,
): ResonatePromise<Return<F>> {
// if the execution is already running, and not killed,
Expand All @@ -383,10 +387,10 @@ class Scheduler {
};

// create a new invocation
const invocation = new Invocation(name, id, undefined, param, opts, defaults);
const invocation = new Invocation(name, id, undefined, param, opts);

// create a new execution
const generator = func(new Context(invocation), ...args);
const generator = func(new Context(this.resonate, invocation), ...args);
const execution = new GeneratorExecution(this.resonate, invocation, generator, durablePromise);

// once the durable promise has been created,
Expand Down Expand Up @@ -504,14 +508,11 @@ class Scheduler {
// human readable name of the function
const name = value.kind === "deferred" ? value.func : value.func.name;

// default opts never change
const defaults = parent.defaults;

// param is only required for deferred executions
const param = value.kind === "deferred" ? value.args[0] : undefined;

// create a new invocation
const invocation = new Invocation(name, id, undefined, param, value.opts, defaults, parent);
const invocation = new Invocation(name, id, undefined, param, value.opts, parent);

// add child and increment counter
parent.addChild(invocation);
Expand All @@ -521,7 +522,7 @@ class Scheduler {

if (value.kind === "resonate") {
// create a generator execution
const ctx = new Context(invocation);
const ctx = new Context(this.resonate, invocation);
execution = new GeneratorExecution(this.resonate, invocation, value.func(ctx, ...value.args));

await execution.create();
Expand Down
Loading

0 comments on commit 7bf23ef

Please sign in to comment.