Skip to content

Commit

Permalink
Store the retry policy with the DurablePromise. (#123)
Browse files Browse the repository at this point in the history
With this change we start storing the retry policy with
the DurablePromise. This enables the invocation to use the
retry policy that was set when executing the function originally
instead of the default retry policy in the recovery path.
  • Loading branch information
avillega authored Jun 12, 2024
1 parent 6160891 commit cd7d6b7
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 6 deletions.
1 change: 1 addition & 0 deletions lib/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ class Scheduler {
const param = {
func: name,
version: opts.version,
retryPolicy: opts.retry,
args,
};

Expand Down
16 changes: 16 additions & 0 deletions lib/core/encoders/json.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ export class JSONEncoder implements IEncoder<unknown, string | undefined> {
}

return JSON.stringify(data, (_, value) => {
if (value === Infinity) {
return "Infinity";
}

if (value === -Infinity) {
return "-Infinity";
}

if (value instanceof AggregateError) {
return {
__type: "aggregate_error",
Expand Down Expand Up @@ -53,6 +61,14 @@ export class JSONEncoder implements IEncoder<unknown, string | undefined> {
}

return JSON.parse(data, (_, value) => {
if (value === "Infinity") {
return Infinity;
}

if (value === "-Infinity") {
return Infinity;
}

if (value?.__type === "aggregate_error") {
return Object.assign(new AggregateError(value.errors, value.message), value);
}
Expand Down
1 change: 0 additions & 1 deletion lib/core/invocation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ export class Invocation<T> {
// - the current time plus the user provided relative time
// - the parent timeout
this.timeout = Math.min(this.createdOn + this.opts.timeout, this.parent?.timeout ?? Infinity);

this.retryPolicy = this.opts.retry;
}

Expand Down
48 changes: 48 additions & 0 deletions lib/core/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,44 @@ export type Never = {
kind: "never";
};

export function isRetryPolicy(value: unknown): value is RetryPolicy {
// Check if the value is an object
if (typeof value !== "object" || value === null) {
return false;
}

// Check if the object has a 'kind' property and if its value is a valid kind string
const kindValue = (value as RetryPolicy).kind;
if (kindValue !== "exponential" && kindValue !== "linear" && kindValue !== "never") {
return false;
}

// Check if the object matches the corresponding type based on the 'kind' value
switch (kindValue) {
case "exponential":
return (
"initialDelayMs" in value &&
"backoffFactor" in value &&
"maxAttempts" in value &&
"maxDelayMs" in value &&
typeof (value as Exponential).initialDelayMs === "number" &&
typeof (value as Exponential).backoffFactor === "number" &&
typeof (value as Exponential).maxAttempts === "number" &&
typeof (value as Exponential).maxDelayMs === "number"
);
case "linear":
return (
"delayMs" in value &&
"maxAttempts" in value &&
typeof (value as Linear).delayMs === "number" &&
typeof (value as Linear).maxAttempts === "number"
);
case "never":
return true; // No additional properties to check for 'never' type
default:
return false; // unreachable
}
}
export function exponential(
initialDelayMs: number = 100,
backoffFactor: number = 2,
Expand Down Expand Up @@ -45,6 +83,16 @@ export function never(): Never {
return { kind: "never" };
}

/**
* Returns an iterable iterator that yields delay values for each retry attempt,
* based on the specified retry policy.
* The iterator stops yielding delay values when either the timeout is reached or the maximum
* number of attempts is exceeded.
*
* @param ctx - The context object containing the retry policy, attempt number, and timeout.
* @returns An iterable iterator that yields delay values for each retry attempt.
*
*/
export function retryIterator<T extends { retryPolicy: RetryPolicy; attempt: number; timeout: number }>(
ctx: T,
): IterableIterator<number> {
Expand Down
12 changes: 9 additions & 3 deletions lib/resonate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ export abstract class ResonateBase {
}

const {
opts: { version, timeout, tags: promiseTags },
opts: { retry, version, timeout, tags: promiseTags },
} = this.functions[funcName][opts.version];

const idempotencyKey =
Expand All @@ -248,6 +248,7 @@ export abstract class ResonateBase {
const promiseParam = {
func: funcName,
version,
retryPolicy: retry,
args,
};

Expand All @@ -269,12 +270,14 @@ export abstract class ResonateBase {
}

/**
* Start the resonate service.
* Start the resonate service which continually checks for pending promises
* every `delay` ms.
*
* @param delay Frequency in ms to check for pending promises.
*/
start(delay: number = 5000) {
clearInterval(this.interval);
this._start();
this.interval = setInterval(() => this._start(), delay);
}

Expand Down Expand Up @@ -328,9 +331,12 @@ export abstract class ResonateBase {
"version" in param &&
typeof param.version === "number" &&
"args" in param &&
Array.isArray(param.args)
Array.isArray(param.args) &&
"retryPolicy" in param &&
retryPolicy.isRetryPolicy(param.retryPolicy)
) {
const { func, opts } = this.functions[param.func][param.version];
opts.retry = param.retryPolicy;
this.execute(param.func, promise.id, func, param.args, opts, opts, promise);
}
}
Expand Down
4 changes: 2 additions & 2 deletions test/resonate.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ describe("Resonate", () => {
expect(() => schedule(resonate, "baz", "", () => {})).toThrow("Function baz version 1 already registered");

register(resonate, "qux", () => {});
expect(() => resonate.schedule("qux", "x", "qux")).rejects.toThrow();
expect(() => resonate.schedule("qux", "* * * * * * *", "qux")).rejects.toThrow();
expect(resonate.schedule("qux", "x", "qux")).rejects.toThrow();
expect(resonate.schedule("qux", "* * * * * * *", "qux")).rejects.toThrow();

// delete the schedules in order to stop the local
// store interval that creates promises
Expand Down

0 comments on commit cd7d6b7

Please sign in to comment.