Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removes "default" options from the invocation #125

Merged
merged 1 commit into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions lib/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { ResonateOptions, Options, PartialOptions } from "./core/options";
import { DurablePromise } from "./core/promises/promises";
import * as retryPolicy from "./core/retry";
import * as schedules from "./core/schedules/schedules";
import * as utils from "./core/utils";
import { ResonateBase } from "./resonate";

/////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -41,10 +42,9 @@ export class Resonate extends ResonateBase {
func: F,
args: Params<F>,
opts: Options,
defaults: Options,
durablePromise?: DurablePromise<any>,
): ResonatePromise<Return<F>> {
return this.scheduler.add(name, id, func, args, opts, defaults, durablePromise);
return this.scheduler.add(name, id, func, args, opts, durablePromise);
}

/**
Expand Down Expand Up @@ -210,22 +210,36 @@ export class Context {
// the id is either:
// 1. a provided string in the case of a deferred execution
// 2. a generated string in the case of an ordinary execution
const id = typeof func === "string" ? func : `${parent.id}.${parent.counter}`;
const id = typeof func === "string" ? func : `${parent.id}.${parent.counter}.${func.name}`;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably the controvertial change in this PR. I added the name of the function to the id. My idea is to have more insight into the distributed call graph, once we get to observability.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! I like that it's informational only, we don't rely on the name of the function to ensure uniqueness


// human readable name of the function
const name = typeof func === "string" ? func : func.name;

// opts are optional and can be provided as the last arg
const { args, opts } = this.invocation.split(argsWithOpts);
const { args, opts: givenOpts } = utils.split(argsWithOpts);
const registeredOptions = this.resonate.registeredOptions(
this.invocation.root.name,
this.invocation.root.opts.version,
);
const resonateOptions = this.resonate.defaults();

const opts = {
...resonateOptions,
...registeredOptions,
...givenOpts,
};

// Merge the tags
opts.tags = { ...resonateOptions.tags, ...registeredOptions.tags, ...givenOpts.tags };

// default opts never change
const defaults = this.invocation.defaults;
// Default lock is false for children execution
opts.lock = opts.lock ?? false;

// param is only required for deferred executions
const param = typeof func === "string" ? args[0] : undefined;

// create a new invocation
const invocation = new Invocation(name, id, undefined, param, opts, defaults, parent);
const invocation = new Invocation(name, id, undefined, param, opts, parent);

let execution: Execution<any>;
if (typeof func === "string") {
Expand Down Expand Up @@ -434,7 +448,6 @@ class Scheduler {
func: F,
args: Params<F>,
opts: Options,
defaults: Options,
durablePromise?: DurablePromise<any>,
): ResonatePromise<Return<F>> {
// if the execution is already running, and not killed,
Expand All @@ -453,7 +466,7 @@ class Scheduler {
};

// create a new invocation
const invocation = new Invocation<Return<F>>(name, id, undefined, param, opts, defaults);
const invocation = new Invocation<Return<F>>(name, id, undefined, param, opts);

// create a new execution
const ctx = new Context(this.resonate, invocation);
Expand Down
56 changes: 26 additions & 30 deletions lib/core/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,23 +91,21 @@ export class OrdinaryExecution<T> extends Execution<T> {
}

protected async fork() {
if (this.invocation.opts.durable) {
if (this.invocation.opts.durable && !this.durablePromise) {
// if durable, create a durable promise
try {
this.durablePromise =
this.durablePromise ??
(await DurablePromise.create<T>(
this.resonate.store.promises,
this.invocation.opts.encoder,
this.invocation.id,
this.invocation.timeout,
{
idempotencyKey: this.invocation.idempotencyKey,
headers: this.invocation.headers,
param: this.invocation.param,
tags: this.invocation.opts.tags,
},
));
this.durablePromise = await DurablePromise.create<T>(
this.resonate.store.promises,
this.invocation.opts.encoder,
this.invocation.id,
this.invocation.timeout,
{
idempotencyKey: this.invocation.idempotencyKey,
headers: this.invocation.headers,
param: this.invocation.param,
tags: this.invocation.opts.tags,
},
);
} catch (e) {
// if an error occurs, kill the execution
this.kill(e);
Expand Down Expand Up @@ -265,22 +263,20 @@ export class GeneratorExecution<T> extends Execution<T> {

async create() {
try {
if (this.invocation.opts.durable) {
if (this.invocation.opts.durable && !this.durablePromise) {
// create a durable promise
this.durablePromise =
this.durablePromise ??
(await DurablePromise.create<T>(
this.resonate.store.promises,
this.invocation.opts.encoder,
this.invocation.id,
this.invocation.timeout,
{
idempotencyKey: this.invocation.idempotencyKey,
headers: this.invocation.headers,
param: this.invocation.param,
tags: this.invocation.opts.tags,
},
));
this.durablePromise = await DurablePromise.create<T>(
this.resonate.store.promises,
this.invocation.opts.encoder,
this.invocation.id,
this.invocation.timeout,
{
idempotencyKey: this.invocation.idempotencyKey,
headers: this.invocation.headers,
param: this.invocation.param,
tags: this.invocation.opts.tags,
},
);

// resolve/reject the invocation if already completed
if (this.durablePromise.resolved) {
Expand Down
33 changes: 3 additions & 30 deletions lib/core/invocation.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Future } from "./future";
import { Options, PartialOptions, isOptions } from "./options";
import { Options } from "./options";
import { RetryPolicy, exponential } from "./retry";

/////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -44,7 +44,6 @@ export class Invocation<T> {
public readonly headers: Record<string, string> | undefined,
public readonly param: unknown,
public readonly opts: Options,
public readonly defaults: Options,
parent?: Invocation<any>,
) {
// create a future and hold on to its resolvers
Expand All @@ -59,13 +58,12 @@ export class Invocation<T> {
// get the execution id from either:
// - a hard coded string
// - a function that returns a string given the invocation id
this.eid = typeof this.opts.eid === "function" ? this.opts.eid(this.id) : this.opts.eid;
this.eid = this.opts.eidFn(this.id);

// get the idempotency key from either:
// - a hard coded string
// - a function that returns a string given the invocation id
this.idempotencyKey =
typeof this.opts.idempotencyKey === "function" ? this.opts.idempotencyKey(this.id) : this.opts.idempotencyKey;
this.idempotencyKey = this.opts.idempotencyKeyFn(this.id);

// the timeout is the minimum of:
// - the current time plus the user provided relative time
Expand All @@ -89,29 +87,4 @@ export class Invocation<T> {
unblock() {
this.blocked = null;
}

split(args: [...any, PartialOptions?]): { args: any[]; opts: Options } {
let opts = args[args.length - 1];

// merge opts
if (isOptions(opts)) {
args = args.slice(0, -1);
opts = {
...this.defaults,
...opts,
tags: { ...this.defaults.tags, ...opts.tags }, // tags are merged
};
} else {
// copy defaults
opts = { ...this.defaults };
}

// lock is false by default
opts.lock = opts.lock ?? false;

// version cannot be overridden
opts.version = this.defaults.version;

return { args, opts };
}
}
10 changes: 6 additions & 4 deletions lib/core/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,21 @@ export type Options = {
durable: boolean;

/**
* A unique id for this execution, defaults to a random id.
* A function that calculates the id for this execution
* defaults to a random funciton.
*/
eid: string | ((id: string) => string);
eidFn: (id: string) => string;

/**
* Overrides the default encoder.
*/
encoder: IEncoder<unknown, string | undefined>;

/**
* Overrides the default idempotency key.
* Overrides the default funciton to calculate the idempotency key.
* defaults to a variation fnv-1a the hash funciton.
*/
idempotencyKey: string | ((id: string) => string);
idempotencyKeyFn: (id: string) => string;

/**
* Acquire a lock for the execution.
Expand Down
21 changes: 9 additions & 12 deletions lib/core/promises/promises.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,19 +202,16 @@ export class DurablePromise<T> {
timeout: number,
opts: Partial<CreateOptions> = {},
) {
return new DurablePromise<T>(
store,
encoder,
await store.create(
id,
opts.idempotencyKey,
opts.strict ?? false,
opts.headers,
encoder.encode(opts.param),
timeout,
opts.tags,
),
const storedPromise = await store.create(
id,
opts.idempotencyKey,
opts.strict ?? false,
opts.headers,
encoder.encode(opts.param),
timeout,
opts.tags,
);
return new DurablePromise<T>(store, encoder, storedPromise);
}

/**
Expand Down
9 changes: 9 additions & 0 deletions lib/core/utils.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { Options, isOptions } from "./options";

export function randomId(): string {
return Math.floor(Math.random() * Number.MAX_SAFE_INTEGER).toString(16);
}
Expand All @@ -13,3 +15,10 @@ export function hash(s: string): string {
const maxLength = 8;
return "0".repeat(Math.max(0, maxLength - hashString.length)) + hashString;
}

export function split(argsWithOpts: any[]): { args: any[]; opts: Partial<Options> } {
const possibleOpts = argsWithOpts.at(-1);
return isOptions(possibleOpts)
? { args: argsWithOpts.slice(0, -1), opts: possibleOpts }
: { args: argsWithOpts, opts: {} };
}
25 changes: 13 additions & 12 deletions lib/generator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Invocation } from "./core/invocation";
import { ResonateOptions, Options, PartialOptions } from "./core/options";
import { DurablePromise } from "./core/promises/promises";
import * as schedules from "./core/schedules/schedules";
import * as utils from "./core/utils";
import { ResonateBase } from "./resonate";

/////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -78,10 +79,9 @@ export class Resonate extends ResonateBase {
func: F,
args: Params<F>,
opts: Options,
defaults: Options,
durablePromise?: DurablePromise<any>,
): ResonatePromise<Return<F>> {
return this.scheduler.add(name, id, func, args, opts, defaults, durablePromise);
return this.scheduler.add(name, id, func, args, opts, durablePromise);
}

/**
Expand Down Expand Up @@ -195,7 +195,10 @@ export class Info {
}

export class Context {
constructor(private invocation: Invocation<any>) {}
constructor(
private resonate: Resonate,
private invocation: Invocation<any>,
) {}

/**
* The running count of child function invocations.
Expand Down Expand Up @@ -313,7 +316,9 @@ export class Context {
}

private _call(func: string | ((...args: any[]) => any), argsWithOpts: any[], yieldFuture: boolean): Call {
const { args, opts } = this.invocation.split(argsWithOpts);
const { args, opts: givenOpts } = utils.split(argsWithOpts);

const opts = this.resonate.defaults(givenOpts);

if (typeof func === "string") {
return { kind: "call", value: { kind: "deferred", func, args, opts }, yieldFuture };
Expand Down Expand Up @@ -366,7 +371,6 @@ class Scheduler {
func: F,
args: Params<F>,
opts: Options,
defaults: Options,
durablePromise?: DurablePromise<any>,
): ResonatePromise<Return<F>> {
// if the execution is already running, and not killed,
Expand All @@ -383,10 +387,10 @@ class Scheduler {
};

// create a new invocation
const invocation = new Invocation(name, id, undefined, param, opts, defaults);
const invocation = new Invocation(name, id, undefined, param, opts);

// create a new execution
const generator = func(new Context(invocation), ...args);
const generator = func(new Context(this.resonate, invocation), ...args);
const execution = new GeneratorExecution(this.resonate, invocation, generator, durablePromise);

// once the durable promise has been created,
Expand Down Expand Up @@ -504,14 +508,11 @@ class Scheduler {
// human readable name of the function
const name = value.kind === "deferred" ? value.func : value.func.name;

// default opts never change
const defaults = parent.defaults;

// param is only required for deferred executions
const param = value.kind === "deferred" ? value.args[0] : undefined;

// create a new invocation
const invocation = new Invocation(name, id, undefined, param, value.opts, defaults, parent);
const invocation = new Invocation(name, id, undefined, param, value.opts, parent);

// add child and increment counter
parent.addChild(invocation);
Expand All @@ -521,7 +522,7 @@ class Scheduler {

if (value.kind === "resonate") {
// create a generator execution
const ctx = new Context(invocation);
const ctx = new Context(this.resonate, invocation);
execution = new GeneratorExecution(this.resonate, invocation, value.func(ctx, ...value.args));

await execution.create();
Expand Down
Loading
Loading