Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add handleCompletePromise helper function #143

Merged
merged 1 commit into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions lib/core/promises/types.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import assert from "assert";
import { IEncoder } from "../encoder";
import { ErrorCodes, ResonateError } from "../errors";

export type DurablePromiseRecord = {
state: "PENDING" | "RESOLVED" | "REJECTED" | "REJECTED_CANCELED" | "REJECTED_TIMEDOUT";
id: string;
Expand Down Expand Up @@ -51,3 +55,34 @@ export function isTimedoutPromise(p: DurablePromiseRecord): boolean {
export function isCompletedPromise(p: DurablePromiseRecord): boolean {
return ["RESOLVED", "REJECTED", "REJECTED_CANCELED", "REJECTED_TIMEDOUT"].includes(p.state);
}

/**
* Handles a completed durable promise and returns its result.
*
* @param p - The DurablePromiseRecord to handle.
* @param encoder - An IEncoder instance used to decode the promise's data.
* @returns The decoded result of the promise if resolved, or null if pending.
* @throws {ResonateError} If the promise was rejected, canceled, or timed out.
*
* @remarks
* Users must handle the null return case, which could indicate either:
* 1. The promise is still pending, or
* 2. The promise completed with a null value.
* It's important to distinguish between these cases in the calling code if necessary.
*/
export function handleCompletedPromise<R>(p: DurablePromiseRecord, encoder: IEncoder<unknown, string | undefined>): R {
assert(p.state !== "PENDING", "Promise was pending when trying to handle its completion");
switch (p.state) {
case "RESOLVED":
return encoder.decode(p.value.data) as R;
case "REJECTED":
throw encoder.decode(p.value.data);
case "REJECTED_CANCELED":
throw new ResonateError("Resonate function canceled", ErrorCodes.CANCELED, encoder.decode(p.value.data));
case "REJECTED_TIMEDOUT":
throw new ResonateError(
`Resonate function timedout at ${new Date(p.timeout).toISOString()}`,
ErrorCodes.TIMEDOUT,
);
}
}
56 changes: 6 additions & 50 deletions lib/resonate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ILogger } from "./core/logger";
import { Logger } from "./core/loggers/logger";
import { PartialOptions, Options, InvocationOverrides, ResonateOptions, options } from "./core/options";
import * as durablePromises from "./core/promises/promises";
import { DurablePromiseRecord } from "./core/promises/types";
import { DurablePromiseRecord, handleCompletedPromise } from "./core/promises/types";
import * as retryPolicies from "./core/retry";
import { runWithRetry } from "./core/retry";
import * as schedules from "./core/schedules/schedules";
Expand Down Expand Up @@ -616,24 +616,8 @@ export class Context {
const runFunc = async (): Promise<R> => {
while (!this.#stopAllPolling) {
const durablePromiseRecord: DurablePromiseRecord = await this.#resonate.promisesStore.get(storedPromise.id);
switch (durablePromiseRecord.state) {
case "RESOLVED":
return opts.encoder.decode(durablePromiseRecord.value.data) as R;
case "REJECTED":
throw opts.encoder.decode(durablePromiseRecord.value.data);
case "REJECTED_CANCELED":
throw new ResonateError(
"Resonate function canceled",
ErrorCodes.CANCELED,
opts.encoder.decode(durablePromiseRecord.value.data),
);
case "REJECTED_TIMEDOUT":
throw new ResonateError(
`Resonate function timedout at ${new Date(durablePromiseRecord.timeout).toISOString()}`,
ErrorCodes.TIMEDOUT,
);
case "PENDING":
break;
if (durablePromiseRecord.state !== "PENDING") {
return handleCompletedPromise(durablePromiseRecord, opts.encoder);
}
// TODO: Consider using exponential backoff instead.
sleep(opts.pollFrequency);
Expand Down Expand Up @@ -1030,22 +1014,8 @@ const _runFunc = async <R>(
const { id, eid, opts } = ctx.invocationData;

// If the promise that comes back from the server is already completed, resolve or reject right away.
switch (storedPromise.state) {
case "RESOLVED":
return opts.encoder.decode(storedPromise.value.data) as R;
case "REJECTED":
throw opts.encoder.decode(storedPromise.value.data);
case "REJECTED_CANCELED":
throw new ResonateError(
"Resonate function canceled",
ErrorCodes.CANCELED,
opts.encoder.decode(storedPromise.value.data),
);
case "REJECTED_TIMEDOUT":
throw new ResonateError(
`Resonate function timedout at ${new Date(storedPromise.timeout).toISOString()}`,
ErrorCodes.TIMEDOUT,
);
if (storedPromise.state !== "PENDING") {
return handleCompletedPromise(storedPromise, opts.encoder);
}

// storedPromise.state === "PENDING"
Expand Down Expand Up @@ -1107,21 +1077,7 @@ const _runFunc = async <R>(
// Because of eventual consistency and recovery paths it is possible that we get a
// rejected promise even if we did call `resolve` on it or the other way around.
// What should never happen is that we get a "PENDING" promise
switch (completedPromiseRecord.state) {
case "RESOLVED":
return value as R;
case "REJECTED":
throw error;
case "REJECTED_CANCELED":
throw new ResonateError("Resonate function canceled", ErrorCodes.CANCELED, error);
case "REJECTED_TIMEDOUT":
throw new ResonateError(
`Resonate function timedout at ${new Date(completedPromiseRecord.timeout).toISOString()}`,
ErrorCodes.TIMEDOUT,
);
case "PENDING":
throw new Error("Unreachable");
}
return handleCompletedPromise(completedPromiseRecord, opts.encoder);
} catch (err) {
if (err instanceof ResonateError && (err.code === ErrorCodes.CANCELED || err.code === ErrorCodes.TIMEDOUT)) {
// Cancel and timeout errors, just forward them
Expand Down
42 changes: 12 additions & 30 deletions test/options.test.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import { describe, test, expect, jest } from "@jest/globals";
import { Base64Encoder } from "../lib/core/encoders/base64";
import { JSONEncoder } from "../lib/core/encoders/json";
import { Options, options } from "../lib/core/options";
import * as retry from "../lib/core/retry";
import * as utils from "../lib/core/utils";
import * as a from "../lib/resonate";

jest.setTimeout(10000);

async function aTest(ctx: a.Context, opts: Partial<Options> = {}) {
console.log({ optsOverrides: opts });
return [
ctx.invocationData.opts,
...(await ctx.run(
Expand All @@ -20,7 +18,6 @@ async function aTest(ctx: a.Context, opts: Partial<Options> = {}) {

describe("Options", () => {
const resonateOpts = {
encoder: new JSONEncoder(),
pollFrequency: 1000,
retryPolicy: retry.exponential(),
tags: { a: "a", b: "b", c: "c" },
Expand All @@ -30,7 +27,6 @@ describe("Options", () => {
const overrides: Partial<Options> = {
durable: false,
eidFn: () => "eid",
encoder: new Base64Encoder(),
idempotencyKeyFn: (_: string) => "idempotencyKey",
shouldLock: false,
pollFrequency: 2000,
Expand All @@ -39,6 +35,7 @@ describe("Options", () => {
timeout: 2000,
version: 2,
};
// Note: eidFn, encoder and idempotencyKeyFn are not serializable, and are note checked in the tests

// Note: we are disabling durable for all tests here
// so that value returned from the run is not serialized.
Expand All @@ -58,11 +55,8 @@ describe("Options", () => {
// Most options defaults are set when created a resonate instance
for (const opts of [top, middle, bottom]) {
expect(opts.durable).toBe(false);
expect(opts.eidFn).toBe(utils.randomId);
expect(opts.encoder).toBe(resonateOpts.encoder);
expect(opts.idempotencyKeyFn).toBe(utils.hash);
expect(opts.pollFrequency).toBe(resonateOpts.pollFrequency);
expect(opts.retryPolicy).toBe(resonateOpts.retryPolicy);
expect(opts.retryPolicy).toEqual(resonateOpts.retryPolicy);
expect(opts.timeout).toBe(resonateOpts.timeout);
expect(opts.version).toBe(1);
}
Expand All @@ -77,16 +71,16 @@ describe("Options", () => {
});

test("registered options propagate down", async () => {
const [top, middle, bottom] = await resonate.run<[Options, Options, Options]>("test.2", `test.2.1`);
const a = await resonate.run<[Options, Options, Options]>("test.2", `test.2.1`);
console.log({ a });

const [top, middle, bottom] = a;
for (const opts of [top, middle, bottom]) {
console.log({ opts });
expect(opts.durable).toBe(overrides.durable);
expect(opts.eidFn).toBe(overrides.eidFn);
expect(opts.encoder).toBe(overrides.encoder);
expect(opts.idempotencyKeyFn).toBe(overrides.idempotencyKeyFn);
expect(opts.shouldLock).toBe(overrides.shouldLock);
expect(opts.pollFrequency).toBe(overrides.pollFrequency);
expect(opts.retryPolicy).toBe(overrides.retryPolicy);
expect(opts.retryPolicy).toEqual(overrides.retryPolicy);
expect(opts.timeout).toBe(overrides.timeout);
expect(opts.version).toBe(overrides.version);
}
Expand All @@ -104,25 +98,19 @@ describe("Options", () => {

// top level options
expect(top.durable).toBe(false);
expect(top.eidFn).toBe(overrides.eidFn);
expect(top.encoder).toBe(resonateOpts.encoder);
expect(top.idempotencyKeyFn).toBe(overrides.idempotencyKeyFn);
expect(top.shouldLock).toBe(false);
expect(top.pollFrequency).toBe(resonateOpts.pollFrequency);
expect(top.retryPolicy).toBe(overrides.retryPolicy);
expect(top.retryPolicy).toEqual(overrides.retryPolicy);
expect(top.tags).toEqual({ ...resonateOpts.tags, ...overrides.tags, "resonate:invocation": "true" });
expect(top.timeout).toBe(overrides.timeout);
expect(top.version).toBe(overrides.version);

// bottom level options
for (const opts of bottom) {
expect(opts.durable).toBe(false);
expect(opts.eidFn).toBe(utils.randomId);
expect(opts.encoder).toBe(resonateOpts.encoder);
expect(opts.idempotencyKeyFn).toBe(utils.hash);
expect(opts.shouldLock).toBe(false);
expect(opts.pollFrequency).toBe(resonateOpts.pollFrequency);
expect(opts.retryPolicy).toBe(resonateOpts.retryPolicy);
expect(opts.retryPolicy).toEqual(resonateOpts.retryPolicy);
expect(opts.tags).toEqual(resonateOpts.tags);
expect(opts.timeout).toBe(resonateOpts.timeout);
expect(opts.version).toBe(overrides.version);
Expand All @@ -139,23 +127,17 @@ describe("Options", () => {

// middle options (overriden)
expect(middle.durable).toBe(overrides.durable);
expect(middle.eidFn).toBe(overrides.eidFn);
expect(middle.encoder).toBe(overrides.encoder);
expect(middle.idempotencyKeyFn).toBe(overrides.idempotencyKeyFn);
expect(middle.shouldLock).toBe(overrides.shouldLock);
expect(middle.pollFrequency).toBe(overrides.pollFrequency);
expect(middle.retryPolicy).toBe(overrides.retryPolicy);
expect(middle.retryPolicy).toEqual(overrides.retryPolicy);
expect(middle.tags).toEqual({ ...resonateOpts.tags, ...overrides.tags });
expect(middle.timeout).toBe(overrides.timeout);

// top and bottom options
for (const opts of [top, bottom]) {
expect(opts.durable).toBe(false);
expect(opts.eidFn).toBe(utils.randomId);
expect(opts.encoder).toBe(resonateOpts.encoder);
expect(opts.idempotencyKeyFn).toBe(utils.hash);
expect(opts.pollFrequency).toBe(resonateOpts.pollFrequency);
expect(opts.retryPolicy).toBe(resonateOpts.retryPolicy);
expect(opts.retryPolicy).toEqual(resonateOpts.retryPolicy);
expect(opts.timeout).toBe(resonateOpts.timeout);
expect(opts.shouldLock).toBe(false);
}
Expand Down
7 changes: 3 additions & 4 deletions test/userResources.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ describe("User Defined Resources", () => {

resonate.register("resource", async (ctx: Context, resourceVal: unknown) => {
ctx.setResource("mock", resourceVal);
return ctx.getResource("mock");
const resource = ctx.getResource("mock");
expect(resource).toBe(resourceVal);
});

const resourceVal = {};
const handle = await resonate.invokeLocal<void>("resource", "resource.0", resourceVal);

await expect(handle.result()).resolves.toBe(resourceVal);
await resonate.invokeLocal<void>("resource", "resource.0", resourceVal);
});

test("Set a resource and get it deep in the context stack", async () => {
Expand Down
Loading