Skip to content

Commit

Permalink
Add initial support for Function Call types (#129)
Browse files Browse the repository at this point in the history
* Add TFC

Initial implementation of TFC for `resonate.run`

* Add RFC and LFC and use them in Context.run overloads.

* Fix typo in doc

* Update lib/core/calls.ts

Co-authored-by: David Farr <[email protected]>

---------

Co-authored-by: David Farr <[email protected]>
  • Loading branch information
avillega and dfarr authored Jul 2, 2024
1 parent 4da10af commit 7cf1db2
Show file tree
Hide file tree
Showing 7 changed files with 329 additions and 54 deletions.
98 changes: 78 additions & 20 deletions lib/async.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Func, isRFC, LFC, Params, Return, RFC } from "./core/calls";
import { Execution, OrdinaryExecution, DeferredExecution } from "./core/execution";
import { ResonatePromise } from "./core/future";
import { Invocation } from "./core/invocation";
Expand All @@ -12,12 +13,6 @@ import { ResonateBase } from "./resonate";
// Types
/////////////////////////////////////////////////////////////////////

export type Func = (ctx: Context, ...args: any[]) => any;

export type Params<F> = F extends (ctx: any, ...args: infer P) => any ? P : never;

export type Return<F> = F extends (...args: any[]) => infer T ? Awaited<T> : never;

/////////////////////////////////////////////////////////////////////
// Resonate
/////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -183,6 +178,15 @@ export class Context {
*/
run<F extends Func>(func: F, ...args: [...Params<F>, PartialOptions?]): ResonatePromise<Return<F>>;

/**
* Invoke a Local Function Call (LFC).
*
* @template F - A type extending Function
* @param {LFC} lfc - The Local Function Call configuration
* @returns {ReturnType<F>} The result of the function execution
*/
run<F extends Func>(lfc: LFC<F>): ResonatePromise<Return<F>>;

/**
* Invoke a remote function.
*
Expand All @@ -195,62 +199,116 @@ export class Context {
run<T>(func: string, args: any, opts?: PartialOptions): ResonatePromise<T>;

/**
* Invoke a remote function.
* Invoke a Remote Function Call (RFC).
*
* @param {RFC} rfc - The Remote Function Call configuration
* @returns {ResonatePromise<T>} A promise that resolves with the result of the remote execution
*
* @description
* This function takes a Remote Function Call (RFC) configuration and executes the specified function
* remotely with the provided options.
*/
run<T>(rfc: RFC): ResonatePromise<T>;

/**
* Invoke a remote function without arguments.
*
* @template T The return type of the remote function.
* @param func The id of the remote function.
* @param opts Optional {@link options}.
* @returns A promise that resolves to the resolved value of the remote function.
*/
run<T>(func: string, opts?: PartialOptions): ResonatePromise<T>;
run(func: string | ((...args: any[]) => any), ...argsWithOpts: any[]): ResonatePromise<any> {

run<F extends Func, T>(funcOrFc: F | LFC<F> | RFC | string, ...argsWithOpts: any[]): ResonatePromise<Return<F> | T> {
// Function with args and possibly options
if (typeof funcOrFc === "function") {
const { args, opts } = utils.split(argsWithOpts);
const lfc: LFC<F> = {
func: funcOrFc,
args,
opts: opts,
};

return this._run<F, T>(lfc);
}

// String (function name) with args and possibly options
if (typeof funcOrFc === "string") {
const funcName = funcOrFc;
const { args, opts } = utils.split(argsWithOpts);

const rfc: RFC = {
funcName,
args,
opts,
};

return this._run<F, T>(rfc);
}

// We are sure it is an FC
return this._run<F, T>(funcOrFc);
}

private _run<F extends Func, T>(fc: LFC<F> | RFC): ResonatePromise<Return<F> | T> {
// the parent is the current invocation
const parent = this.invocation;

// human readable name of the function
let name!: string;

// the id is either:
// 1. a provided string in the case of a deferred execution
// 2. a generated string in the case of an ordinary execution
const id = typeof func === "string" ? func : `${parent.id}.${parent.counter}.${func.name}`;
let id!: string;

// human readable name of the function
const name = typeof func === "string" ? func : func.name;
// Params to store with the durable promise. For RemoteExecutions we just encode the
// given arg(s). Otherwise we store nothing since it is unncesary.
let param: any | undefined;

if (isRFC(fc)) {
name = fc.funcName;
id = name;
param = fc.args;
} else {
name = fc.func.name;
id = `${parent.id}.${parent.counter}.${name}`;
param = undefined;
}

// opts are optional and can be provided as the last arg
const { args, opts: givenOpts } = utils.split(argsWithOpts);
const registeredOptions = this.resonate.registeredOptions(
this.invocation.root.name,
this.invocation.root.opts.version,
);

const resonateOptions = this.resonate.defaults();

const opts = {
...resonateOptions,
...registeredOptions,
...givenOpts,
...fc.opts,
};

// Merge the tags
opts.tags = { ...resonateOptions.tags, ...registeredOptions.tags, ...givenOpts.tags };
opts.tags = { ...resonateOptions.tags, ...registeredOptions.tags, ...fc.opts?.tags };

// Default lock is false for children execution
opts.lock = opts.lock ?? false;

// param is only required for deferred executions
const param = typeof func === "string" ? args[0] : undefined;

// create a new invocation
const invocation = new Invocation(name, id, undefined, param, opts, parent);

let execution: Execution<any>;
if (typeof func === "string") {
if (isRFC(fc)) {
// create a deferred execution
// this execution will be fulfilled out-of-process
execution = new DeferredExecution(this.resonate, invocation);
} else {
// create an ordinary execution
// this execution wraps a user-provided function
const ctx = new Context(this.resonate, invocation);
execution = new OrdinaryExecution(this.resonate, invocation, () => func(ctx, ...args));
execution = new OrdinaryExecution(this.resonate, invocation, () => fc.func(ctx, ...(fc.args ?? [])));
}

// bump the counter
Expand Down
52 changes: 52 additions & 0 deletions lib/core/calls.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { Context } from "..";
import { Options } from "./options";

// The type of a resonate function
export type Func = (ctx: Context, ...args: any[]) => any;

// The args of a resonate function excluding the context argument
export type Params<F> = F extends (ctx: any, ...args: infer P) => any ? P : never;

// The return type of a resonate function
export type Return<F> = F extends (...args: any[]) => infer T ? Awaited<T> : never;

// A top level function call, to be used in the with `resonate.run`
export type TFC = {
// Fuction Name of the function to be called, has to be previously registered
funcName: string;
// Given id for this execution, ids have to be distinct between top level calls
id: string;
// args to be passed to the specified func.
args?: any[];
// opts to override the registered Options. Only the subset of options stored
// with the DurablePromise can be overriden that way we can have the same set
// of options when running in the recovery path.
optsOverrides?: Partial<
Pick<Options, "durable" | "eidFn" | "idempotencyKeyFn" | "retry" | "tags" | "timeout" | "version">
>;
};

// Remote function call
export type RFC = {
funcName: string;
args?: any;
opts?: Partial<Options>;
};

// Remote function call
export type LFC<F extends Func> = {
func: F;
args?: any[];
opts?: Partial<Options>;
};

// Type guard for RFC
export function isRFC(obj: any): obj is RFC {
return (
typeof obj === "object" &&
obj !== null &&
typeof obj.funcName === "string" &&
(obj.args === undefined || typeof obj.args === "object") &&
(obj.opts === undefined || (typeof obj.opts === "object" && obj.opts !== null))
);
}
30 changes: 30 additions & 0 deletions lib/core/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,33 @@ export function split(argsWithOpts: any[]): { args: any[]; opts: Partial<Options
? { args: argsWithOpts.slice(0, -1), opts: possibleOpts }
: { args: argsWithOpts, opts: {} };
}

/**
* Merges two objects, preferring values from the first object when both are defined.
* If a property is undefined in the first object, the value from the second object is used.
*
* @template T - Type of the first object
* @template U - Type of the second object
* @param {T} obj1 - The first object to merge
* @param {U} obj2 - The second object to merge
* @returns {T & U} A new object containing all properties from both input objects
*
* @example
* const obj1 = { a: 1, b: undefined };
* const obj2 = { b: 2, c: 3 };
* const result = mergeObjects(obj1, obj2);
* // result is { a: 1, b: 2, c: 3 }
*
* @remarks
* - Properties from obj1 take precedence over obj2 when both are defined.
* - The function creates a new object and does not modify the input objects.
* - Nested objects and arrays are not deeply merged, only their references are copied.
*/
export function mergeObjects<T extends object, U extends object>(obj1: T, obj2: U): T & U {
return Object.entries({ ...obj1, ...obj2 }).reduce((acc, [key, value]) => {
acc[key as keyof (T & U)] = (
obj1[key as keyof T] !== undefined ? obj1[key as keyof T] : obj2[key as keyof U]
) as any;
return acc;
}, {} as any);
}
83 changes: 50 additions & 33 deletions lib/resonate.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { TFC } from "./core/calls";
import { IEncoder } from "./core/encoder";
import { JSONEncoder } from "./core/encoders/json";
import { ResonatePromise } from "./core/future";
Expand Down Expand Up @@ -159,6 +160,8 @@ export abstract class ResonateBase {
}
}

run<T>(tfc: TFC): ResonatePromise<T>;
run<T>(name: string, id: string, ...argsWithOpts: [...any, PartialOptions?]): ResonatePromise<T>;
/**
* Run a Resonate function. Functions must first be registered with {@link register}.
*
Expand All @@ -168,53 +171,67 @@ export abstract class ResonateBase {
* @param argsWithOpts The function arguments.
* @returns A promise that resolve to the function return value.
*/
run<T>(name: string, id: string, ...argsWithOpts: [...any, PartialOptions?]): ResonatePromise<T> {
const { args, opts: givenOpts } = utils.split(argsWithOpts);
const { version, durable, idempotencyKeyFn, eidFn, tags, timeout } = givenOpts;
run<T>(nameOrTfc: string | TFC, id?: string, ...argsWithOpts: [...any, PartialOptions?]): ResonatePromise<T> {
let tfc!: TFC;
if (typeof nameOrTfc === "string") {
const { args, opts: givenOpts } = utils.split(argsWithOpts);
const { durable, eidFn, idempotencyKeyFn, retry, tags, timeout, version } = givenOpts;

if (!id) {
throw new Error("Id was not set for a top level function call");
}

if (!this.functions[name] || !this.functions[name][version || 0]) {
throw new Error(`Function ${name} version ${version} not registered`);
tfc = {
funcName: nameOrTfc,
id,
args,
optsOverrides: {
durable,
eidFn,
idempotencyKeyFn,
retry,
tags,
timeout,
version,
},
};
} else {
tfc = nameOrTfc;
}

// the options registered with the function are the defaults
const { func, opts: registeredOpts } = this.functions[name][version || 0];

// only the following options can be overridden, this information is persisted
// in the durable promise and therefore not required on the recovery path
const override: Partial<Options> = {};
return this._run(tfc);
}

if (durable !== undefined) {
override.durable = durable;
}
private _run<T>(tfc: TFC): ResonatePromise<T> {
// Sets the defaults for the optional fields of TFC
tfc.optsOverrides = tfc.optsOverrides ?? {};
tfc.args = tfc.args ?? [];
tfc.optsOverrides.version = tfc.optsOverrides.version || 0;

if (idempotencyKeyFn !== undefined) {
override.idempotencyKeyFn = idempotencyKeyFn;
if (!this.functions[tfc.funcName] || !this.functions[tfc.funcName][tfc.optsOverrides.version]) {
throw new Error(`Function ${tfc.funcName} version ${tfc.optsOverrides.version} not registered`);
}

if (eidFn !== undefined) {
override.eidFn = eidFn;
}
// the options registered with the function are the defaults
const { func, opts: registeredOpts } = this.functions[tfc.funcName][tfc.optsOverrides.version];

if (tags !== undefined) {
override.tags = { ...registeredOpts.tags, ...tags, "resonate:invocation": "true" };
} else {
override.tags = { ...registeredOpts.tags, "resonate:invocation": "true" };
}
// merge defaults with override to get opts
const opts = utils.mergeObjects(tfc.optsOverrides, registeredOpts);

if (timeout !== undefined) {
override.timeout = timeout;
}
// We want to preserve the version that was registered with the function
// when calling the function with `version=0` we will find the latest
// registered version of a function, but we want to make sure the registered
// version is preserved.
opts.version = registeredOpts.version;

// merge defaults with override to get opts
const opts = {
...registeredOpts,
...override,
};
// For tags we need to merge the objects themselves and add the
// resonate:invocation tag to identify a top level invocation
opts.tags = { ...registeredOpts.tags, ...tfc.optsOverrides.tags, "resonate:invocation": "true" };

// lock on top level is true by default
opts.lock = opts.lock ?? true;

return this.execute(name, id, func, args, opts);
return this.execute(tfc.funcName, tfc.id, func, tfc.args, opts);
}

// Gets the registered options for a specific function and version
Expand Down
Loading

0 comments on commit 7cf1db2

Please sign in to comment.