From cd7d6b7207d8b58eddde81889207096239214d97 Mon Sep 17 00:00:00 2001 From: Andres Villegas Date: Wed, 12 Jun 2024 15:26:33 -0700 Subject: [PATCH] Store the retry policy with the DurablePromise. (#123) With this change we start storing the retry policy with the DurablePromise. This enables the invocation to use the retry policy that was set when executing the function originally instead of the default retry policy in the recovery path. --- lib/async.ts | 1 + lib/core/encoders/json.ts | 16 +++++++++++++ lib/core/invocation.ts | 1 - lib/core/retry.ts | 48 +++++++++++++++++++++++++++++++++++++++ lib/resonate.ts | 12 +++++++--- test/resonate.test.ts | 4 ++-- 6 files changed, 76 insertions(+), 6 deletions(-) diff --git a/lib/async.ts b/lib/async.ts index cf81f6b..b8dbdd4 100644 --- a/lib/async.ts +++ b/lib/async.ts @@ -448,6 +448,7 @@ class Scheduler { const param = { func: name, version: opts.version, + retryPolicy: opts.retry, args, }; diff --git a/lib/core/encoders/json.ts b/lib/core/encoders/json.ts index 08ee92d..fd627a4 100644 --- a/lib/core/encoders/json.ts +++ b/lib/core/encoders/json.ts @@ -10,6 +10,14 @@ export class JSONEncoder implements IEncoder { } return JSON.stringify(data, (_, value) => { + if (value === Infinity) { + return "Infinity"; + } + + if (value === -Infinity) { + return "-Infinity"; + } + if (value instanceof AggregateError) { return { __type: "aggregate_error", @@ -53,6 +61,14 @@ export class JSONEncoder implements IEncoder { } return JSON.parse(data, (_, value) => { + if (value === "Infinity") { + return Infinity; + } + + if (value === "-Infinity") { + return Infinity; + } + if (value?.__type === "aggregate_error") { return Object.assign(new AggregateError(value.errors, value.message), value); } diff --git a/lib/core/invocation.ts b/lib/core/invocation.ts index 435fc4b..f98b273 100644 --- a/lib/core/invocation.ts +++ b/lib/core/invocation.ts @@ -71,7 +71,6 @@ export class Invocation { // - the current time plus the user provided relative time // - the parent timeout this.timeout = Math.min(this.createdOn + this.opts.timeout, this.parent?.timeout ?? Infinity); - this.retryPolicy = this.opts.retry; } diff --git a/lib/core/retry.ts b/lib/core/retry.ts index f7d758f..2b86166 100644 --- a/lib/core/retry.ts +++ b/lib/core/retry.ts @@ -18,6 +18,44 @@ export type Never = { kind: "never"; }; +export function isRetryPolicy(value: unknown): value is RetryPolicy { + // Check if the value is an object + if (typeof value !== "object" || value === null) { + return false; + } + + // Check if the object has a 'kind' property and if its value is a valid kind string + const kindValue = (value as RetryPolicy).kind; + if (kindValue !== "exponential" && kindValue !== "linear" && kindValue !== "never") { + return false; + } + + // Check if the object matches the corresponding type based on the 'kind' value + switch (kindValue) { + case "exponential": + return ( + "initialDelayMs" in value && + "backoffFactor" in value && + "maxAttempts" in value && + "maxDelayMs" in value && + typeof (value as Exponential).initialDelayMs === "number" && + typeof (value as Exponential).backoffFactor === "number" && + typeof (value as Exponential).maxAttempts === "number" && + typeof (value as Exponential).maxDelayMs === "number" + ); + case "linear": + return ( + "delayMs" in value && + "maxAttempts" in value && + typeof (value as Linear).delayMs === "number" && + typeof (value as Linear).maxAttempts === "number" + ); + case "never": + return true; // No additional properties to check for 'never' type + default: + return false; // unreachable + } +} export function exponential( initialDelayMs: number = 100, backoffFactor: number = 2, @@ -45,6 +83,16 @@ export function never(): Never { return { kind: "never" }; } +/** + * Returns an iterable iterator that yields delay values for each retry attempt, + * based on the specified retry policy. + * The iterator stops yielding delay values when either the timeout is reached or the maximum + * number of attempts is exceeded. + * + * @param ctx - The context object containing the retry policy, attempt number, and timeout. + * @returns An iterable iterator that yields delay values for each retry attempt. + * + */ export function retryIterator( ctx: T, ): IterableIterator { diff --git a/lib/resonate.ts b/lib/resonate.ts index 723a743..26157cb 100644 --- a/lib/resonate.ts +++ b/lib/resonate.ts @@ -239,7 +239,7 @@ export abstract class ResonateBase { } const { - opts: { version, timeout, tags: promiseTags }, + opts: { retry, version, timeout, tags: promiseTags }, } = this.functions[funcName][opts.version]; const idempotencyKey = @@ -248,6 +248,7 @@ export abstract class ResonateBase { const promiseParam = { func: funcName, version, + retryPolicy: retry, args, }; @@ -269,12 +270,14 @@ export abstract class ResonateBase { } /** - * Start the resonate service. + * Start the resonate service which continually checks for pending promises + * every `delay` ms. * * @param delay Frequency in ms to check for pending promises. */ start(delay: number = 5000) { clearInterval(this.interval); + this._start(); this.interval = setInterval(() => this._start(), delay); } @@ -328,9 +331,12 @@ export abstract class ResonateBase { "version" in param && typeof param.version === "number" && "args" in param && - Array.isArray(param.args) + Array.isArray(param.args) && + "retryPolicy" in param && + retryPolicy.isRetryPolicy(param.retryPolicy) ) { const { func, opts } = this.functions[param.func][param.version]; + opts.retry = param.retryPolicy; this.execute(param.func, promise.id, func, param.args, opts, opts, promise); } } diff --git a/test/resonate.test.ts b/test/resonate.test.ts index 8c9197b..6a075bd 100644 --- a/test/resonate.test.ts +++ b/test/resonate.test.ts @@ -183,8 +183,8 @@ describe("Resonate", () => { expect(() => schedule(resonate, "baz", "", () => {})).toThrow("Function baz version 1 already registered"); register(resonate, "qux", () => {}); - expect(() => resonate.schedule("qux", "x", "qux")).rejects.toThrow(); - expect(() => resonate.schedule("qux", "* * * * * * *", "qux")).rejects.toThrow(); + expect(resonate.schedule("qux", "x", "qux")).rejects.toThrow(); + expect(resonate.schedule("qux", "* * * * * * *", "qux")).rejects.toThrow(); // delete the schedules in order to stop the local // store interval that creates promises