Skip to content

Commit

Permalink
Add user resources (dependency injection) and finalizers (#135)
Browse files Browse the repository at this point in the history
* Add user resources (dependency injection) and finalizers

* Add doc comments to setResource and getResource

* Add more tests for userResources and document that finalizers must not fail

* Remove missleading comment
  • Loading branch information
avillega authored Jul 26, 2024
1 parent 3110fa2 commit b9d9f4c
Show file tree
Hide file tree
Showing 5 changed files with 924 additions and 496 deletions.
13 changes: 10 additions & 3 deletions lib/core/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,27 @@ export function retryIterator<T extends { retryPolicy: RetryPolicy; attempt: num
};
}

export async function runWithRetry<T>(func: () => Promise<T>, retryPolicy: RetryPolicy, timeout: number) {
export async function runWithRetry<T>(
func: () => Promise<T>,
onRetry: () => Promise<void>,
retryPolicy: RetryPolicy,
timeout: number,
) {
let error;

const ctx = { attempt: 0, retryPolicy, timeout };

// invoke the function according to the retry policy
for (const delay of retryIterator(ctx)) {
await new Promise((resolve) => setTimeout(resolve, delay));

if (ctx.attempt > 0) {
await onRetry();
}

try {
return await func();
} catch (e) {
error = e;

// bump the attempt count
ctx.attempt++;
}
Expand Down
105 changes: 91 additions & 14 deletions lib/resonate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ export type Params<F> = F extends (ctx: any, ...args: infer P) => any ? P : neve
// The return type of a resonate function
export type Return<F> = F extends (...args: any[]) => infer T ? Awaited<T> : never;

//////////////////////////////////////////////////////////////////////

export class Resonate {
#registeredFunctions: Record<string, Record<number, { func: Func; opts: Options }>> = {};
#invocationHandles: Map<string, InvocationHandle<any>>;
Expand Down Expand Up @@ -356,14 +358,12 @@ export class Resonate {
const ctx = Context.createRootContext(this, { id, eid, name, opts });

// we need to hold on to a boolean to determine if the function was successful,
// we cannot rely on the value or error as these values could be undefined
// we cannot rely on the value or error since func could return undefined.
let success = true;
try {
value = await runWithRetry(
async () => {
ctx.reset();
return await func(ctx, ...args);
},
async () => await func(ctx, ...args), //func
async () => await ctx.onRetry(), //onRetry
opts.retryPolicy,
storedPromise.timeout,
);
Expand Down Expand Up @@ -529,6 +529,8 @@ export class Context {
#invocationHandles: Map<string, InvocationHandle<any>>;
#aborted: boolean;
#abortCause: any;
#resources: Map<string, any>;
#finalizers: (() => Promise<void>)[];
childrenCount: number;
readonly invocationData: InvocationData;
parent: Context | undefined;
Expand All @@ -537,6 +539,8 @@ export class Context {
private constructor(resonate: Resonate, invocationData: InvocationData, parent: Context | undefined) {
this.#resonate = resonate;
this.#invocationHandles = new Map();
this.#resources = new Map();
this.#finalizers = [];
this.#aborted = false;
this.parent = parent;
this.root = !parent ? this : parent.root;
Expand All @@ -552,12 +556,24 @@ export class Context {
return new Context(parentCtx.#resonate, invocationData, parentCtx);
}

reset() {
async onRetry(): Promise<void> {
this.childrenCount = 0;
await this.finalize();
}

async finalize() {
// It is important to await all promises before finalizing the resources
// doing it the other way around could cause problems
await Promise.allSettled(Array.from(this.#invocationHandles, ([_, handle]) => handle.result()));

// We need to run the finalizers in reverse insertion order since later set finalizers might have
// a dependency in early set resources
for (const finalizer of this.#finalizers.reverse()) {
await finalizer();
}

this.#resources.clear();
this.#finalizers = [];
}

abort(cause: any) {
Expand All @@ -573,6 +589,71 @@ export class Context {
return this.#abortCause;
}

/**
* Adds a finalizer function to be executed at the end of the current context.
* Finalizers are run in reverse order of their definition (last-in, first-out).
*
* @param fn - An asynchronous function to be executed as a finalizer.
* It should return a Promise that resolves to void.
*
* @remarks
* Finalizer functions must be non fallible.
*/
addFinalizer(fn: () => Promise<void>) {
this.#finalizers.push(fn);
}

/**
* Sets a named resource for the current context and optionally adds a finalizer.
*
* @param name - A unique string identifier for the resource.
* @param resource - The resource to be stored. Can be of any type.
* @param finalizer - Optional. An asynchronous function to be executed when the context ends.
* Finalizers are run in reverse order of their addition to the context and
* must not fail.
* @throws {Error} Throws an error if a resource with the same name already exists in the current context.
*
* This method associates a resource with a unique name in the current context.
* If a finalizer is provided, it will be executed when the context ends.
* Finalizers are useful for cleanup operations, such as closing connections or freeing resources.
*/
setResource(name: string, resource: any, finalizer?: () => Promise<void>): void {
if (this.#resources.has(name)) {
throw new Error("Resource already set for this context");
}

this.#resources.set(name, resource);
if (finalizer) {
this.#finalizers.push(finalizer);
}
}

/**
* Retrieves a resource by name from the current context or its parent contexts.
*
* @template R - The expected type of the resource.
* @param name - The unique string identifier of the resource to retrieve.
* @returns The resource of type R if found, or undefined if not found.
*
* This method searches for a resource in the following order:
* 1. In the current context.
* 2. If not found, it recursively searches in parent contexts.
* 3. Returns undefined if the resource is not found in any context.
*
* @remarks
* The method uses type assertion to cast the resource to type R.
* Ensure that the type parameter R matches the actual type of the stored resource
* to avoid runtime type errors.
*/
getResource<R>(name: string): R | undefined {
const resource = this.#resources.get(name);
if (resource) {
return resource as R;
}

return this.parent ? this.parent.getResource<R>(name) : undefined;
}

/**
* Invoke a remote function.
*
Expand Down Expand Up @@ -709,10 +790,8 @@ export class Context {
const ctx = Context.createChildrenContext(this, { name, id, eid, opts });
const timeout = Date.now() + opts.timeout;
return (await runWithRetry(
async () => {
ctx.reset();
return await func(ctx, ...args);
},
async () => await func(ctx, ...args),
async () => await ctx.onRetry(),
opts.retryPolicy,
timeout,
)) as R;
Expand Down Expand Up @@ -789,10 +868,8 @@ export class Context {
let success = true;
try {
value = await runWithRetry(
async () => {
ctx.reset();
return await func(ctx, ...args);
},
async () => await func(ctx, ...args),
async () => await ctx.onRetry(),
opts.retryPolicy,
storedPromise.timeout,
);
Expand Down
Loading

0 comments on commit b9d9f4c

Please sign in to comment.