Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store the retry policy with the DurablePromise. #123

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ class Scheduler {
const param = {
func: name,
version: opts.version,
retryPolicy: opts.retry,
args,
};

Expand Down
16 changes: 16 additions & 0 deletions lib/core/encoders/json.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ export class JSONEncoder implements IEncoder<unknown, string | undefined> {
}

return JSON.stringify(data, (_, value) => {
if (value === Infinity) {
return "Infinity";
}

if (value === -Infinity) {
return "-Infinity";
}

if (value instanceof AggregateError) {
return {
__type: "aggregate_error",
Expand Down Expand Up @@ -53,6 +61,14 @@ export class JSONEncoder implements IEncoder<unknown, string | undefined> {
}

return JSON.parse(data, (_, value) => {
if (value === "Infinity") {
return Infinity;
}

if (value === "-Infinity") {
return Infinity;
}

if (value?.__type === "aggregate_error") {
return Object.assign(new AggregateError(value.errors, value.message), value);
}
Expand Down
1 change: 0 additions & 1 deletion lib/core/invocation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ export class Invocation<T> {
// - the current time plus the user provided relative time
// - the parent timeout
this.timeout = Math.min(this.createdOn + this.opts.timeout, this.parent?.timeout ?? Infinity);

this.retryPolicy = this.opts.retry;
}

Expand Down
48 changes: 48 additions & 0 deletions lib/core/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,44 @@ export type Never = {
kind: "never";
};

export function isRetryPolicy(value: unknown): value is RetryPolicy {
// Check if the value is an object
if (typeof value !== "object" || value === null) {
return false;
}

// Check if the object has a 'kind' property and if its value is a valid kind string
const kindValue = (value as RetryPolicy).kind;
if (kindValue !== "exponential" && kindValue !== "linear" && kindValue !== "never") {
return false;
}

// Check if the object matches the corresponding type based on the 'kind' value
switch (kindValue) {
case "exponential":
return (
"initialDelayMs" in value &&
"backoffFactor" in value &&
"maxAttempts" in value &&
"maxDelayMs" in value &&
typeof (value as Exponential).initialDelayMs === "number" &&
typeof (value as Exponential).backoffFactor === "number" &&
typeof (value as Exponential).maxAttempts === "number" &&
typeof (value as Exponential).maxDelayMs === "number"
);
case "linear":
return (
"delayMs" in value &&
"maxAttempts" in value &&
typeof (value as Linear).delayMs === "number" &&
typeof (value as Linear).maxAttempts === "number"
);
case "never":
return true; // No additional properties to check for 'never' type
default:
return false; // unreachable
}
}
export function exponential(
initialDelayMs: number = 100,
backoffFactor: number = 2,
Expand Down Expand Up @@ -45,6 +83,16 @@ export function never(): Never {
return { kind: "never" };
}

/**
* Returns an iterable iterator that yields delay values for each retry attempt,
* based on the specified retry policy.
* The iterator stops yielding delay values when either the timeout is reached or the maximum
* number of attempts is exceeded.
*
* @param ctx - The context object containing the retry policy, attempt number, and timeout.
* @returns An iterable iterator that yields delay values for each retry attempt.
*
*/
export function retryIterator<T extends { retryPolicy: RetryPolicy; attempt: number; timeout: number }>(
ctx: T,
): IterableIterator<number> {
Expand Down
12 changes: 9 additions & 3 deletions lib/resonate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ export abstract class ResonateBase {
}

const {
opts: { version, timeout, tags: promiseTags },
opts: { retry, version, timeout, tags: promiseTags },
} = this.functions[funcName][opts.version];

const idempotencyKey =
Expand All @@ -248,6 +248,7 @@ export abstract class ResonateBase {
const promiseParam = {
func: funcName,
version,
retryPolicy: retry,
args,
};

Expand All @@ -269,12 +270,14 @@ export abstract class ResonateBase {
}

/**
* Start the resonate service.
* Start the resonate service which continually checks for pending promises
* every `delay` ms.
*
* @param delay Frequency in ms to check for pending promises.
*/
start(delay: number = 5000) {
clearInterval(this.interval);
this._start();
this.interval = setInterval(() => this._start(), delay);
}

Expand Down Expand Up @@ -328,9 +331,12 @@ export abstract class ResonateBase {
"version" in param &&
typeof param.version === "number" &&
"args" in param &&
Array.isArray(param.args)
Array.isArray(param.args) &&
"retryPolicy" in param &&
retryPolicy.isRetryPolicy(param.retryPolicy)
) {
const { func, opts } = this.functions[param.func][param.version];
opts.retry = param.retryPolicy;
this.execute(param.func, promise.id, func, param.args, opts, opts, promise);
}
}
Expand Down
4 changes: 2 additions & 2 deletions test/resonate.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ describe("Resonate", () => {
expect(() => schedule(resonate, "baz", "", () => {})).toThrow("Function baz version 1 already registered");

register(resonate, "qux", () => {});
expect(() => resonate.schedule("qux", "x", "qux")).rejects.toThrow();
expect(() => resonate.schedule("qux", "* * * * * * *", "qux")).rejects.toThrow();
expect(resonate.schedule("qux", "x", "qux")).rejects.toThrow();
expect(resonate.schedule("qux", "* * * * * * *", "qux")).rejects.toThrow();

// delete the schedules in order to stop the local
// store interval that creates promises
Expand Down
Loading