diff --git a/lib/async.ts b/lib/async.ts index cf81f6b..b8dbdd4 100644 --- a/lib/async.ts +++ b/lib/async.ts @@ -448,6 +448,7 @@ class Scheduler { const param = { func: name, version: opts.version, + retryPolicy: opts.retry, args, }; diff --git a/lib/core/encoders/json.ts b/lib/core/encoders/json.ts index 08ee92d..fd627a4 100644 --- a/lib/core/encoders/json.ts +++ b/lib/core/encoders/json.ts @@ -10,6 +10,14 @@ export class JSONEncoder implements IEncoder { } return JSON.stringify(data, (_, value) => { + if (value === Infinity) { + return "Infinity"; + } + + if (value === -Infinity) { + return "-Infinity"; + } + if (value instanceof AggregateError) { return { __type: "aggregate_error", @@ -53,6 +61,14 @@ export class JSONEncoder implements IEncoder { } return JSON.parse(data, (_, value) => { + if (value === "Infinity") { + return Infinity; + } + + if (value === "-Infinity") { + return Infinity; + } + if (value?.__type === "aggregate_error") { return Object.assign(new AggregateError(value.errors, value.message), value); } diff --git a/lib/core/invocation.ts b/lib/core/invocation.ts index 435fc4b..f98b273 100644 --- a/lib/core/invocation.ts +++ b/lib/core/invocation.ts @@ -71,7 +71,6 @@ export class Invocation { // - the current time plus the user provided relative time // - the parent timeout this.timeout = Math.min(this.createdOn + this.opts.timeout, this.parent?.timeout ?? Infinity); - this.retryPolicy = this.opts.retry; } diff --git a/lib/core/retry.ts b/lib/core/retry.ts index f7d758f..2b86166 100644 --- a/lib/core/retry.ts +++ b/lib/core/retry.ts @@ -18,6 +18,44 @@ export type Never = { kind: "never"; }; +export function isRetryPolicy(value: unknown): value is RetryPolicy { + // Check if the value is an object + if (typeof value !== "object" || value === null) { + return false; + } + + // Check if the object has a 'kind' property and if its value is a valid kind string + const kindValue = (value as RetryPolicy).kind; + if (kindValue !== "exponential" && kindValue !== "linear" && kindValue !== "never") { + return false; + } + + // Check if the object matches the corresponding type based on the 'kind' value + switch (kindValue) { + case "exponential": + return ( + "initialDelayMs" in value && + "backoffFactor" in value && + "maxAttempts" in value && + "maxDelayMs" in value && + typeof (value as Exponential).initialDelayMs === "number" && + typeof (value as Exponential).backoffFactor === "number" && + typeof (value as Exponential).maxAttempts === "number" && + typeof (value as Exponential).maxDelayMs === "number" + ); + case "linear": + return ( + "delayMs" in value && + "maxAttempts" in value && + typeof (value as Linear).delayMs === "number" && + typeof (value as Linear).maxAttempts === "number" + ); + case "never": + return true; // No additional properties to check for 'never' type + default: + return false; // unreachable + } +} export function exponential( initialDelayMs: number = 100, backoffFactor: number = 2, @@ -45,6 +83,16 @@ export function never(): Never { return { kind: "never" }; } +/** + * Returns an iterable iterator that yields delay values for each retry attempt, + * based on the specified retry policy. + * The iterator stops yielding delay values when either the timeout is reached or the maximum + * number of attempts is exceeded. + * + * @param ctx - The context object containing the retry policy, attempt number, and timeout. + * @returns An iterable iterator that yields delay values for each retry attempt. + * + */ export function retryIterator( ctx: T, ): IterableIterator { diff --git a/lib/resonate.ts b/lib/resonate.ts index 723a743..26157cb 100644 --- a/lib/resonate.ts +++ b/lib/resonate.ts @@ -239,7 +239,7 @@ export abstract class ResonateBase { } const { - opts: { version, timeout, tags: promiseTags }, + opts: { retry, version, timeout, tags: promiseTags }, } = this.functions[funcName][opts.version]; const idempotencyKey = @@ -248,6 +248,7 @@ export abstract class ResonateBase { const promiseParam = { func: funcName, version, + retryPolicy: retry, args, }; @@ -269,12 +270,14 @@ export abstract class ResonateBase { } /** - * Start the resonate service. + * Start the resonate service which continually checks for pending promises + * every `delay` ms. * * @param delay Frequency in ms to check for pending promises. */ start(delay: number = 5000) { clearInterval(this.interval); + this._start(); this.interval = setInterval(() => this._start(), delay); } @@ -328,9 +331,12 @@ export abstract class ResonateBase { "version" in param && typeof param.version === "number" && "args" in param && - Array.isArray(param.args) + Array.isArray(param.args) && + "retryPolicy" in param && + retryPolicy.isRetryPolicy(param.retryPolicy) ) { const { func, opts } = this.functions[param.func][param.version]; + opts.retry = param.retryPolicy; this.execute(param.func, promise.id, func, param.args, opts, opts, promise); } } diff --git a/test/resonate.test.ts b/test/resonate.test.ts index 8c9197b..6a075bd 100644 --- a/test/resonate.test.ts +++ b/test/resonate.test.ts @@ -183,8 +183,8 @@ describe("Resonate", () => { expect(() => schedule(resonate, "baz", "", () => {})).toThrow("Function baz version 1 already registered"); register(resonate, "qux", () => {}); - expect(() => resonate.schedule("qux", "x", "qux")).rejects.toThrow(); - expect(() => resonate.schedule("qux", "* * * * * * *", "qux")).rejects.toThrow(); + expect(resonate.schedule("qux", "x", "qux")).rejects.toThrow(); + expect(resonate.schedule("qux", "* * * * * * *", "qux")).rejects.toThrow(); // delete the schedules in order to stop the local // store interval that creates promises