diff --git a/lib/async.ts b/lib/async.ts index fa0381a..8d61e47 100644 --- a/lib/async.ts +++ b/lib/async.ts @@ -1,3 +1,4 @@ +import { Func, isRFC, LFC, Params, Return, RFC } from "./core/calls"; import { Execution, OrdinaryExecution, DeferredExecution } from "./core/execution"; import { ResonatePromise } from "./core/future"; import { Invocation } from "./core/invocation"; @@ -12,12 +13,6 @@ import { ResonateBase } from "./resonate"; // Types ///////////////////////////////////////////////////////////////////// -export type Func = (ctx: Context, ...args: any[]) => any; - -export type Params = F extends (ctx: any, ...args: infer P) => any ? P : never; - -export type Return = F extends (...args: any[]) => infer T ? Awaited : never; - ///////////////////////////////////////////////////////////////////// // Resonate ///////////////////////////////////////////////////////////////////// @@ -183,6 +178,15 @@ export class Context { */ run(func: F, ...args: [...Params, PartialOptions?]): ResonatePromise>; + /** + * Invoke a Local Function Call (LFC). + * + * @template F - A type extending Function + * @param {LFC} lfc - The Local Function Call configuration + * @returns {ReturnType} The result of the function execution + */ + run(lfc: LFC): ResonatePromise>; + /** * Invoke a remote function. * @@ -195,7 +199,19 @@ export class Context { run(func: string, args: any, opts?: PartialOptions): ResonatePromise; /** - * Invoke a remote function. + * Invoke a Remote Function Call (RFC). + * + * @param {RFC} rfc - The Remote Function Call configuration + * @returns {ResonatePromise} A promise that resolves with the result of the remote execution + * + * @description + * This function takes a Remote Function Call (RFC) configuration and executes the specified function + * remotely with the provided options. + */ + run(rfc: RFC): ResonatePromise; + + /** + * Invoke a remote function without arguments. * * @template T The return type of the remote function. * @param func The id of the remote function. @@ -203,46 +219,88 @@ export class Context { * @returns A promise that resolves to the resolved value of the remote function. */ run(func: string, opts?: PartialOptions): ResonatePromise; - run(func: string | ((...args: any[]) => any), ...argsWithOpts: any[]): ResonatePromise { + + run(funcOrFc: F | LFC | RFC | string, ...argsWithOpts: any[]): ResonatePromise | T> { + // Function with args and possibly options + if (typeof funcOrFc === "function") { + const { args, opts } = utils.split(argsWithOpts); + const lfc: LFC = { + func: funcOrFc, + args, + opts: opts, + }; + + return this._run(lfc); + } + + // String (function name) with args and possibly options + if (typeof funcOrFc === "string") { + const funcName = funcOrFc; + const { args, opts } = utils.split(argsWithOpts); + + const rfc: RFC = { + funcName, + args, + opts, + }; + + return this._run(rfc); + } + + // We are sure it is an FC + return this._run(funcOrFc); + } + + private _run(fc: LFC | RFC): ResonatePromise | T> { // the parent is the current invocation const parent = this.invocation; + // human readable name of the function + let name!: string; + // the id is either: // 1. a provided string in the case of a deferred execution // 2. a generated string in the case of an ordinary execution - const id = typeof func === "string" ? func : `${parent.id}.${parent.counter}.${func.name}`; + let id!: string; - // human readable name of the function - const name = typeof func === "string" ? func : func.name; + // Params to store with the durable promise. For RemoteExecutions we just encode the + // given arg(s). Otherwise we store nothing since it is unncesary. + let param: any | undefined; + + if (isRFC(fc)) { + name = fc.funcName; + id = name; + param = fc.args; + } else { + name = fc.func.name; + id = `${parent.id}.${parent.counter}.${name}`; + param = undefined; + } - // opts are optional and can be provided as the last arg - const { args, opts: givenOpts } = utils.split(argsWithOpts); const registeredOptions = this.resonate.registeredOptions( this.invocation.root.name, this.invocation.root.opts.version, ); + const resonateOptions = this.resonate.defaults(); const opts = { ...resonateOptions, ...registeredOptions, - ...givenOpts, + ...fc.opts, }; // Merge the tags - opts.tags = { ...resonateOptions.tags, ...registeredOptions.tags, ...givenOpts.tags }; + opts.tags = { ...resonateOptions.tags, ...registeredOptions.tags, ...fc.opts?.tags }; // Default lock is false for children execution opts.lock = opts.lock ?? false; - // param is only required for deferred executions - const param = typeof func === "string" ? args[0] : undefined; - // create a new invocation const invocation = new Invocation(name, id, undefined, param, opts, parent); let execution: Execution; - if (typeof func === "string") { + if (isRFC(fc)) { // create a deferred execution // this execution will be fulfilled out-of-process execution = new DeferredExecution(this.resonate, invocation); @@ -250,7 +308,7 @@ export class Context { // create an ordinary execution // this execution wraps a user-provided function const ctx = new Context(this.resonate, invocation); - execution = new OrdinaryExecution(this.resonate, invocation, () => func(ctx, ...args)); + execution = new OrdinaryExecution(this.resonate, invocation, () => fc.func(ctx, ...(fc.args ?? []))); } // bump the counter diff --git a/lib/core/calls.ts b/lib/core/calls.ts new file mode 100644 index 0000000..2f8f5d6 --- /dev/null +++ b/lib/core/calls.ts @@ -0,0 +1,52 @@ +import { Context } from ".."; +import { Options } from "./options"; + +// The type of a resonate function +export type Func = (ctx: Context, ...args: any[]) => any; + +// The args of a resonate function excluding the context argument +export type Params = F extends (ctx: any, ...args: infer P) => any ? P : never; + +// The return type of a resonate function +export type Return = F extends (...args: any[]) => infer T ? Awaited : never; + +// A top level function call, to be used in the with `resonate.run` +export type TFC = { + // Fuction Name of the function to be called, has to be previously registered + funcName: string; + // Given id for this execution, ids have to be distinct between top level calls + id: string; + // args to be passed to the specified func. + args?: any[]; + // opts to override the registered Options. Only the subset of options stored + // with the DurablePromise can be overriden that way we can have the same set + // of options when running in the recovery path. + optsOverrides?: Partial< + Pick + >; +}; + +// Remote function call +export type RFC = { + funcName: string; + args?: any; + opts?: Partial; +}; + +// Remote function call +export type LFC = { + func: F; + args?: any[]; + opts?: Partial; +}; + +// Type guard for RFC +export function isRFC(obj: any): obj is RFC { + return ( + typeof obj === "object" && + obj !== null && + typeof obj.funcName === "string" && + (obj.args === undefined || typeof obj.args === "object") && + (obj.opts === undefined || (typeof obj.opts === "object" && obj.opts !== null)) + ); +} diff --git a/lib/core/utils.ts b/lib/core/utils.ts index ac4dc41..92812f8 100644 --- a/lib/core/utils.ts +++ b/lib/core/utils.ts @@ -22,3 +22,33 @@ export function split(argsWithOpts: any[]): { args: any[]; opts: Partial(obj1: T, obj2: U): T & U { + return Object.entries({ ...obj1, ...obj2 }).reduce((acc, [key, value]) => { + acc[key as keyof (T & U)] = ( + obj1[key as keyof T] !== undefined ? obj1[key as keyof T] : obj2[key as keyof U] + ) as any; + return acc; + }, {} as any); +} diff --git a/lib/resonate.ts b/lib/resonate.ts index a76e4e5..d4a0343 100644 --- a/lib/resonate.ts +++ b/lib/resonate.ts @@ -1,3 +1,4 @@ +import { TFC } from "./core/calls"; import { IEncoder } from "./core/encoder"; import { JSONEncoder } from "./core/encoders/json"; import { ResonatePromise } from "./core/future"; @@ -159,6 +160,8 @@ export abstract class ResonateBase { } } + run(tfc: TFC): ResonatePromise; + run(name: string, id: string, ...argsWithOpts: [...any, PartialOptions?]): ResonatePromise; /** * Run a Resonate function. Functions must first be registered with {@link register}. * @@ -168,53 +171,67 @@ export abstract class ResonateBase { * @param argsWithOpts The function arguments. * @returns A promise that resolve to the function return value. */ - run(name: string, id: string, ...argsWithOpts: [...any, PartialOptions?]): ResonatePromise { - const { args, opts: givenOpts } = utils.split(argsWithOpts); - const { version, durable, idempotencyKeyFn, eidFn, tags, timeout } = givenOpts; + run(nameOrTfc: string | TFC, id?: string, ...argsWithOpts: [...any, PartialOptions?]): ResonatePromise { + let tfc!: TFC; + if (typeof nameOrTfc === "string") { + const { args, opts: givenOpts } = utils.split(argsWithOpts); + const { durable, eidFn, idempotencyKeyFn, retry, tags, timeout, version } = givenOpts; + + if (!id) { + throw new Error("Id was not set for a top level function call"); + } - if (!this.functions[name] || !this.functions[name][version || 0]) { - throw new Error(`Function ${name} version ${version} not registered`); + tfc = { + funcName: nameOrTfc, + id, + args, + optsOverrides: { + durable, + eidFn, + idempotencyKeyFn, + retry, + tags, + timeout, + version, + }, + }; + } else { + tfc = nameOrTfc; } - // the options registered with the function are the defaults - const { func, opts: registeredOpts } = this.functions[name][version || 0]; - - // only the following options can be overridden, this information is persisted - // in the durable promise and therefore not required on the recovery path - const override: Partial = {}; + return this._run(tfc); + } - if (durable !== undefined) { - override.durable = durable; - } + private _run(tfc: TFC): ResonatePromise { + // Sets the defaults for the optional fields of TFC + tfc.optsOverrides = tfc.optsOverrides ?? {}; + tfc.args = tfc.args ?? []; + tfc.optsOverrides.version = tfc.optsOverrides.version || 0; - if (idempotencyKeyFn !== undefined) { - override.idempotencyKeyFn = idempotencyKeyFn; + if (!this.functions[tfc.funcName] || !this.functions[tfc.funcName][tfc.optsOverrides.version]) { + throw new Error(`Function ${tfc.funcName} version ${tfc.optsOverrides.version} not registered`); } - if (eidFn !== undefined) { - override.eidFn = eidFn; - } + // the options registered with the function are the defaults + const { func, opts: registeredOpts } = this.functions[tfc.funcName][tfc.optsOverrides.version]; - if (tags !== undefined) { - override.tags = { ...registeredOpts.tags, ...tags, "resonate:invocation": "true" }; - } else { - override.tags = { ...registeredOpts.tags, "resonate:invocation": "true" }; - } + // merge defaults with override to get opts + const opts = utils.mergeObjects(tfc.optsOverrides, registeredOpts); - if (timeout !== undefined) { - override.timeout = timeout; - } + // We want to preserve the version that was registered with the function + // when calling the function with `version=0` we will find the latest + // registered version of a function, but we want to make sure the registered + // version is preserved. + opts.version = registeredOpts.version; - // merge defaults with override to get opts - const opts = { - ...registeredOpts, - ...override, - }; + // For tags we need to merge the objects themselves and add the + // resonate:invocation tag to identify a top level invocation + opts.tags = { ...registeredOpts.tags, ...tfc.optsOverrides.tags, "resonate:invocation": "true" }; // lock on top level is true by default opts.lock = opts.lock ?? true; - return this.execute(name, id, func, args, opts); + return this.execute(tfc.funcName, tfc.id, func, tfc.args, opts); } // Gets the registered options for a specific function and version diff --git a/test/async.test.ts b/test/async.test.ts index f0e05ad..d0f6105 100644 --- a/test/async.test.ts +++ b/test/async.test.ts @@ -10,6 +10,10 @@ describe("Functions: async", () => { return await ctx.run(func); } + async function runFC(ctx: Context, func: any) { + return await ctx.run({ func: func }); + } + function ordinarySuccess() { return "foo"; } @@ -30,10 +34,18 @@ describe("Functions: async", () => { return await ctx.run("success", ctx.options({ poll: 0 })); } + async function deferredSuccessRFC(ctx: Context) { + return await ctx.run({ funcName: "success", opts: ctx.options({ poll: 0 }) }); + } + async function deferredFailure(ctx: Context) { return await ctx.run("failure", ctx.options({ poll: 0 })); } + async function deferredFailureRFC(ctx: Context) { + return await ctx.run({ funcName: "failure", opts: ctx.options({ poll: 0 }) }); + } + test("success", async () => { const resonate = new Resonate({ timeout: 1000, @@ -46,6 +58,7 @@ describe("Functions: async", () => { }); resonate.register("run", run); + resonate.register("runFC", runFC); resonate.register("success", ordinarySuccess); resonate.register("successAsync", ordinarySuccessAsync); @@ -63,6 +76,12 @@ describe("Functions: async", () => { await resonate.run("run", "run.c", deferredSuccess), await resonate.run("success", "run.d"), await resonate.run("successAsync", "run.e"), + await resonate.run({ funcName: "run", id: "run.a", args: [ordinarySuccess] }), + await resonate.run({ funcName: "run", id: "run.b", args: [ordinarySuccessAsync] }), + await resonate.run({ funcName: "run", id: "run.c", args: [deferredSuccess] }), + await resonate.run({ funcName: "success", id: "run.d" }), + await resonate.run({ funcName: "successAsync", id: "run.e" }), + await resonate.run({ funcName: "runFC", id: "run.f", args: [deferredSuccessRFC] }), ]; expect(results.every((r) => r === "foo")).toBe(true); @@ -75,6 +94,7 @@ describe("Functions: async", () => { }); resonate.register("run", run); + resonate.register("runFC", runFC); resonate.register("failure", ordinaryFailure); resonate.register("failureAsync", ordinaryFailureAsync); @@ -91,10 +111,53 @@ describe("Functions: async", () => { () => resonate.run("run", "run.c", deferredFailure), () => resonate.run("failure", "run.d"), () => resonate.run("failureAsync", "run.e"), + () => resonate.run({ funcName: "run", id: "run.a", args: [ordinaryFailure] }), + () => resonate.run({ funcName: "run", id: "run.b", args: [ordinaryFailureAsync] }), + () => resonate.run({ funcName: "run", id: "run.c", args: [deferredFailure] }), + () => resonate.run({ funcName: "failure", id: "run.d" }), + () => resonate.run({ funcName: "failureAsync", id: "run.e" }), + () => resonate.run({ funcName: "runFC", id: "run.f", args: [deferredFailureRFC] }), ]; for (const f of functions) { await expect(f()).rejects.toThrow("foo"); } }); + test("FC and non FC apis produce same result", async () => { + const resonate = new Resonate({ + timeout: 1000, + retry: retry.linear(0, 3), + }); + + resonate.register("runFC", async (ctx: Context, arg1: string, arg2: string) => { + const a = await ctx.run({ + func: (ctx: Context, arg: string) => { + return arg; + }, + args: [arg1], + }); + const b = await ctx.run({ + func: (ctx: Context, arg: string) => { + return arg; + }, + args: [arg2], + }); + return a + b; + }); + + resonate.register("run", async (ctx: Context, arg1: string, arg2: string) => { + const a = await ctx.run((ctx: Context, arg: string) => { + return arg; + }, arg1); + const b = await ctx.run((ctx: Context, arg: string) => { + return arg; + }, arg2); + return a + b; + }); + + const fcResult = await resonate.run({ funcName: "runFC", id: "run.a", args: ["foo", "bar"] }); + const regularResult = await resonate.run("run", "run.b", "foo", "bar"); + + expect(fcResult).toBe(regularResult); + }); }); diff --git a/test/options.test.ts b/test/options.test.ts index 341db2f..25dcd33 100644 --- a/test/options.test.ts +++ b/test/options.test.ts @@ -137,7 +137,7 @@ describe("Options", () => { expect(top.idempotencyKeyFn).toBe(overrides.idempotencyKeyFn); expect(top.lock).toBe(true); expect(top.poll).toBe(resonateOpts.poll); - expect(top.retry).toBe(resonateOpts.retry); + expect(top.retry).toBe(overrides.retry); expect(top.tags).toEqual({ ...resonateOpts.tags, ...overrides.tags, "resonate:invocation": "true" }); expect(top.timeout).toBe(overrides.timeout); expect(top.version).toBe(overrides.version); diff --git a/test/utils.test.ts b/test/utils.test.ts new file mode 100644 index 0000000..827a112 --- /dev/null +++ b/test/utils.test.ts @@ -0,0 +1,55 @@ +import { describe, test, expect, jest } from "@jest/globals"; +import { mergeObjects } from "../lib/core/utils"; + +jest.setTimeout(1000); + +describe("mergeObjects", () => { + test("merges two objects with non-overlapping keys", () => { + const obj1 = { a: 1, b: 2 }; + const obj2 = { c: 3, d: 4 }; + const result = mergeObjects(obj1, obj2); + expect(result).toEqual({ a: 1, b: 2, c: 3, d: 4 }); + }); + + test("prefers values from obj1 when keys overlap and neither is undefined", () => { + const obj1 = { a: 1, b: 2 }; + const obj2 = { b: 3, c: 4 }; + const result = mergeObjects(obj1, obj2); + expect(result).toEqual({ a: 1, b: 2, c: 4 }); + }); + + test("uses obj2 value when obj1 value is undefined", () => { + const obj1 = { a: 1, b: undefined as number | undefined }; + const obj2 = { b: 2, c: 3 }; + const result = mergeObjects(obj1, obj2); + expect(result).toEqual({ a: 1, b: 2, c: 3 }); + }); + + test("handles nested objects", () => { + const obj1 = { a: { x: 1 }, b: 2 }; + const obj2 = { a: { y: 2 }, c: 3 }; + const result = mergeObjects(obj1, obj2); + expect(result).toEqual({ a: { x: 1 }, b: 2, c: 3 }); + }); + + test("handles arrays", () => { + const obj1 = { a: [1, 2], b: 2 }; + const obj2 = { a: [3, 4], c: 3 }; + const result = mergeObjects(obj1, obj2); + expect(result).toEqual({ a: [1, 2], b: 2, c: 3 }); + }); + + test("handles empty objects", () => { + const obj1 = {}; + const obj2 = { a: 1 }; + const result = mergeObjects(obj1, obj2); + expect(result).toEqual({ a: 1 }); + }); + + test("handles objects with null values", () => { + const obj1 = { a: null, b: 2 }; + const obj2 = { a: 1, c: null }; + const result = mergeObjects(obj1, obj2); + expect(result).toEqual({ a: null, b: 2, c: null }); + }); +});