Skip to content

Commit

Permalink
Remove io (#92)
Browse files Browse the repository at this point in the history
All functions must be run through context.run
  • Loading branch information
dfarr authored Mar 28, 2024
1 parent ffcfbe8 commit 570cf32
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 99 deletions.
91 changes: 11 additions & 80 deletions lib/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@ import { Execution, OrdinaryExecution, DeferredExecution } from "./core/executio
import { ResonatePromise } from "./core/future";
import { Invocation } from "./core/invocation";
import { ResonateOptions, Options, PartialOptions } from "./core/options";
import { Retry } from "./core/retries/retry";
import * as utils from "./core/utils";
import { ResonateBase } from "./resonate";

/////////////////////////////////////////////////////////////////////
// Types
/////////////////////////////////////////////////////////////////////

export type AFunc = (ctx: Context, ...args: any[]) => any;

export type IFunc = (info: Info, ...args: any[]) => any;
export type Func = (ctx: Context, ...args: any[]) => any;

export type Params<F> = F extends (ctx: any, ...args: infer P) => any ? P : never;

Expand Down Expand Up @@ -45,7 +42,7 @@ export class Resonate extends ResonateBase {
* @param opts Resonate options, can be constructed by calling {@link options}.
* @returns Resonate function
*/
register<F extends AFunc>(
register<F extends Func>(
name: string,
func: F,
opts?: Partial<Options>,
Expand All @@ -61,13 +58,13 @@ export class Resonate extends ResonateBase {
* @param opts Resonate options, can be constructed by calling {@link options}.
* @returns Resonate function
*/
register<F extends AFunc>(
register<F extends Func>(
name: string,
version: number,
func: F,
opts?: Partial<Options>,
): (id: string, ...args: any) => ResonatePromise<Return<F>>;
register<F extends AFunc>(
register<F extends Func>(
name: string,
funcOrVersion: F | number,
funcOrOpts: F | Partial<Options>,
Expand All @@ -83,11 +80,11 @@ export class Resonate extends ResonateBase {
* @param opts Resonate options, can be constructed by calling {@link options}.
* @returns Resonate function
*/
registerModule<F extends AFunc>(module: Record<string, F>, opts: Partial<Options> = {}) {
registerModule<F extends Func>(module: Record<string, F>, opts: Partial<Options> = {}) {
super.registerModule(module, opts);
}

protected execute<F extends AFunc>(
protected execute<F extends Func>(
name: string,
version: number,
id: string,
Expand All @@ -103,7 +100,7 @@ export class Resonate extends ResonateBase {
// Context
/////////////////////////////////////////////////////////////////////

export class Info {
export class Context {
constructor(private invocation: Invocation<any>) {}

/**
Expand All @@ -113,38 +110,6 @@ export class Info {
return this.invocation.attempt;
}

/**
* Uniquely identifies the function invocation.
*/
get id() {
return this.invocation.id;
}

/**
* Deduplicates function invocations with the same id.
*/
get idempotencyKey() {
return this.invocation.idempotencyKey;
}

/**
* The timestamp in ms, once this time elapses the function invocation will timeout.
*/
get timeout() {
return this.invocation.timeout;
}

/**
* The resonate function version.
*/
get version() {
return this.invocation.version;
}
}

export class Context {
constructor(private invocation: Invocation<any>) {}

/**
* The running count of child function invocations.
*/
Expand Down Expand Up @@ -188,7 +153,7 @@ export class Context {
* @param args The function arguments, optionally followed by {@link options}.
* @returns A promise that resolves to the return value of the function.
*/
run<F extends AFunc>(func: F, ...args: [...Params<F>, PartialOptions?]): ResonatePromise<Return<F>>;
run<F extends Func>(func: F, ...args: [...Params<F>, PartialOptions?]): ResonatePromise<Return<F>>;

/**
* Invoke a remote function.
Expand Down Expand Up @@ -243,10 +208,10 @@ export class Context {
// this execution will be fulfilled out-of-process
execution = new DeferredExecution(invocation);
} else {
// create an ordinary execution// human readable name of the function
// create an ordinary execution
// this execution wraps a user-provided function
const ctx = new Context(invocation);
execution = new OrdinaryExecution(invocation, () => func(ctx, ...args), Retry.never());
execution = new OrdinaryExecution(invocation, () => func(ctx, ...args));
}

// bump the counter
Expand All @@ -256,40 +221,6 @@ export class Context {
return execution.execute();
}

/**
* Invoke an io function.
*
* @template F The type of the function.
* @param func The function to invoke.
* @param args The function arguments, optionally followed by {@link options}.
* @returns A promise that resolves to the return value of the function.
*/
io<F extends IFunc>(func: F, ...args: [...Params<F>, PartialOptions?]): ResonatePromise<Return<F>>;
io(func: (...args: any[]) => any, ...argsWithOpts: any[]): ResonatePromise<any> {
const parent = this.invocation;

const id = `${parent.id}.${parent.counter}`;
const idempotencyKey = utils.hash(id);
const { args, opts } = this.invocation.split(argsWithOpts);

const name = func.name;
const version = parent.version;

const invocation = new Invocation(name, version, id, idempotencyKey, undefined, undefined, opts, parent);

// create an ordinary execution
// this execution wraps a user-provided io function
// unlike run, an io function can not create child invocations
const info = new Info(invocation);
const execution = new OrdinaryExecution(invocation, () => func(info, ...args));

// bump the counter
parent.counter++;

// return a resonate promise
return execution.execute();
}

/**
* Construct options.
*
Expand All @@ -308,7 +239,7 @@ export class Context {
class Scheduler {
private cache: Record<string, Execution<any>> = {};

add<F extends AFunc>(
add<F extends Func>(
name: string,
version: number,
id: string,
Expand Down
10 changes: 7 additions & 3 deletions lib/core/invocation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,14 @@ export class Invocation<T> {

split(args: [...any, PartialOptions?]): { args: any[]; opts: Options } {
const opts = args[args.length - 1];
const parentOpts = this.parent?.opts ?? this.root.opts;

// defaults are specified on the root invocation
// this means that overrides only apply to the current invocation
// and do no propagate to children
const defaults = this.root.opts;

return isPartialOptions(opts)
? { args: args.slice(0, -1), opts: { ...parentOpts, ...opts, tags: { ...parentOpts.tags, ...opts.tags } } }
: { args, opts: parentOpts };
? { args: args.slice(0, -1), opts: { ...defaults, ...opts, tags: { ...defaults.tags, ...opts.tags } } }
: { args, opts: defaults };
}
}
22 changes: 6 additions & 16 deletions test/async.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ describe("Functions: async", () => {
return await ctx.run(func);
}

async function io(ctx: Context, func: any) {
return await ctx.io(func);
}

function ordinarySuccess() {
return "foo";
}
Expand Down Expand Up @@ -45,7 +41,6 @@ describe("Functions: async", () => {
});

resonate.register("run", run);
resonate.register("io", io);
resonate.register("success", ordinarySuccess);
resonate.register("successAsync", ordinarySuccessAsync);

Expand All @@ -60,11 +55,9 @@ describe("Functions: async", () => {
await ordinarySuccessAsync(),
await resonate.run("run", "run.a", ordinarySuccess),
await resonate.run("run", "run.b", ordinarySuccessAsync),
await resonate.run("io", "run.c", ordinarySuccess),
await resonate.run("io", "run.d", ordinarySuccessAsync),
await resonate.run("run", "run.e", deferredSuccess),
await resonate.run("success", "run.f"),
await resonate.run("successAsync", "run.g"),
await resonate.run("run", "run.c", deferredSuccess),
await resonate.run("success", "run.d"),
await resonate.run("successAsync", "run.e"),
];

expect(results.every((r) => r === "foo")).toBe(true);
Expand All @@ -77,7 +70,6 @@ describe("Functions: async", () => {
});

resonate.register("run", run);
resonate.register("io", io);
resonate.register("failure", ordinaryFailure);
resonate.register("failureAsync", ordinaryFailureAsync);

Expand All @@ -91,11 +83,9 @@ describe("Functions: async", () => {
() => ordinaryFailureAsync(),
() => resonate.run("run", "run.a", ordinaryFailure),
() => resonate.run("run", "run.b", ordinaryFailureAsync),
() => resonate.run("io", "run.c", ordinaryFailure),
() => resonate.run("io", "run.d", ordinaryFailureAsync),
() => resonate.run("run", "run.e", deferredFailure),
() => resonate.run("failure", "run.f"),
() => resonate.run("failureAsync", "run.g"),
() => resonate.run("run", "run.c", deferredFailure),
() => resonate.run("failure", "run.d"),
() => resonate.run("failureAsync", "run.e"),
];

for (const f of functions) {
Expand Down

0 comments on commit 570cf32

Please sign in to comment.