diff --git a/lib/core/promises/types.ts b/lib/core/promises/types.ts index b816cf1..c971d90 100644 --- a/lib/core/promises/types.ts +++ b/lib/core/promises/types.ts @@ -1,3 +1,7 @@ +import assert from "assert"; +import { IEncoder } from "../encoder"; +import { ErrorCodes, ResonateError } from "../errors"; + export type DurablePromiseRecord = { state: "PENDING" | "RESOLVED" | "REJECTED" | "REJECTED_CANCELED" | "REJECTED_TIMEDOUT"; id: string; @@ -51,3 +55,34 @@ export function isTimedoutPromise(p: DurablePromiseRecord): boolean { export function isCompletedPromise(p: DurablePromiseRecord): boolean { return ["RESOLVED", "REJECTED", "REJECTED_CANCELED", "REJECTED_TIMEDOUT"].includes(p.state); } + +/** + * Handles a completed durable promise and returns its result. + * + * @param p - The DurablePromiseRecord to handle. + * @param encoder - An IEncoder instance used to decode the promise's data. + * @returns The decoded result of the promise if resolved, or null if pending. + * @throws {ResonateError} If the promise was rejected, canceled, or timed out. + * + * @remarks + * Users must handle the null return case, which could indicate either: + * 1. The promise is still pending, or + * 2. The promise completed with a null value. + * It's important to distinguish between these cases in the calling code if necessary. + */ +export function handleCompletedPromise(p: DurablePromiseRecord, encoder: IEncoder): R { + assert(p.state !== "PENDING", "Promise was pending when trying to handle its completion"); + switch (p.state) { + case "RESOLVED": + return encoder.decode(p.value.data) as R; + case "REJECTED": + throw encoder.decode(p.value.data); + case "REJECTED_CANCELED": + throw new ResonateError("Resonate function canceled", ErrorCodes.CANCELED, encoder.decode(p.value.data)); + case "REJECTED_TIMEDOUT": + throw new ResonateError( + `Resonate function timedout at ${new Date(p.timeout).toISOString()}`, + ErrorCodes.TIMEDOUT, + ); + } +} diff --git a/lib/resonate.ts b/lib/resonate.ts index ad0d7d5..3019242 100644 --- a/lib/resonate.ts +++ b/lib/resonate.ts @@ -4,7 +4,7 @@ import { ILogger } from "./core/logger"; import { Logger } from "./core/loggers/logger"; import { PartialOptions, Options, InvocationOverrides, ResonateOptions, options } from "./core/options"; import * as durablePromises from "./core/promises/promises"; -import { DurablePromiseRecord } from "./core/promises/types"; +import { DurablePromiseRecord, handleCompletedPromise } from "./core/promises/types"; import * as retryPolicies from "./core/retry"; import { runWithRetry } from "./core/retry"; import * as schedules from "./core/schedules/schedules"; @@ -616,24 +616,8 @@ export class Context { const runFunc = async (): Promise => { while (!this.#stopAllPolling) { const durablePromiseRecord: DurablePromiseRecord = await this.#resonate.promisesStore.get(storedPromise.id); - switch (durablePromiseRecord.state) { - case "RESOLVED": - return opts.encoder.decode(durablePromiseRecord.value.data) as R; - case "REJECTED": - throw opts.encoder.decode(durablePromiseRecord.value.data); - case "REJECTED_CANCELED": - throw new ResonateError( - "Resonate function canceled", - ErrorCodes.CANCELED, - opts.encoder.decode(durablePromiseRecord.value.data), - ); - case "REJECTED_TIMEDOUT": - throw new ResonateError( - `Resonate function timedout at ${new Date(durablePromiseRecord.timeout).toISOString()}`, - ErrorCodes.TIMEDOUT, - ); - case "PENDING": - break; + if (durablePromiseRecord.state !== "PENDING") { + return handleCompletedPromise(durablePromiseRecord, opts.encoder); } // TODO: Consider using exponential backoff instead. sleep(opts.pollFrequency); @@ -1030,22 +1014,8 @@ const _runFunc = async ( const { id, eid, opts } = ctx.invocationData; // If the promise that comes back from the server is already completed, resolve or reject right away. - switch (storedPromise.state) { - case "RESOLVED": - return opts.encoder.decode(storedPromise.value.data) as R; - case "REJECTED": - throw opts.encoder.decode(storedPromise.value.data); - case "REJECTED_CANCELED": - throw new ResonateError( - "Resonate function canceled", - ErrorCodes.CANCELED, - opts.encoder.decode(storedPromise.value.data), - ); - case "REJECTED_TIMEDOUT": - throw new ResonateError( - `Resonate function timedout at ${new Date(storedPromise.timeout).toISOString()}`, - ErrorCodes.TIMEDOUT, - ); + if (storedPromise.state !== "PENDING") { + return handleCompletedPromise(storedPromise, opts.encoder); } // storedPromise.state === "PENDING" @@ -1107,21 +1077,7 @@ const _runFunc = async ( // Because of eventual consistency and recovery paths it is possible that we get a // rejected promise even if we did call `resolve` on it or the other way around. // What should never happen is that we get a "PENDING" promise - switch (completedPromiseRecord.state) { - case "RESOLVED": - return value as R; - case "REJECTED": - throw error; - case "REJECTED_CANCELED": - throw new ResonateError("Resonate function canceled", ErrorCodes.CANCELED, error); - case "REJECTED_TIMEDOUT": - throw new ResonateError( - `Resonate function timedout at ${new Date(completedPromiseRecord.timeout).toISOString()}`, - ErrorCodes.TIMEDOUT, - ); - case "PENDING": - throw new Error("Unreachable"); - } + return handleCompletedPromise(completedPromiseRecord, opts.encoder); } catch (err) { if (err instanceof ResonateError && (err.code === ErrorCodes.CANCELED || err.code === ErrorCodes.TIMEDOUT)) { // Cancel and timeout errors, just forward them diff --git a/test/options.test.ts b/test/options.test.ts index 91e08fc..e66a761 100644 --- a/test/options.test.ts +++ b/test/options.test.ts @@ -1,14 +1,12 @@ import { describe, test, expect, jest } from "@jest/globals"; -import { Base64Encoder } from "../lib/core/encoders/base64"; -import { JSONEncoder } from "../lib/core/encoders/json"; import { Options, options } from "../lib/core/options"; import * as retry from "../lib/core/retry"; -import * as utils from "../lib/core/utils"; import * as a from "../lib/resonate"; jest.setTimeout(10000); async function aTest(ctx: a.Context, opts: Partial = {}) { + console.log({ optsOverrides: opts }); return [ ctx.invocationData.opts, ...(await ctx.run( @@ -20,7 +18,6 @@ async function aTest(ctx: a.Context, opts: Partial = {}) { describe("Options", () => { const resonateOpts = { - encoder: new JSONEncoder(), pollFrequency: 1000, retryPolicy: retry.exponential(), tags: { a: "a", b: "b", c: "c" }, @@ -30,7 +27,6 @@ describe("Options", () => { const overrides: Partial = { durable: false, eidFn: () => "eid", - encoder: new Base64Encoder(), idempotencyKeyFn: (_: string) => "idempotencyKey", shouldLock: false, pollFrequency: 2000, @@ -39,6 +35,7 @@ describe("Options", () => { timeout: 2000, version: 2, }; + // Note: eidFn, encoder and idempotencyKeyFn are not serializable, and are note checked in the tests // Note: we are disabling durable for all tests here // so that value returned from the run is not serialized. @@ -58,11 +55,8 @@ describe("Options", () => { // Most options defaults are set when created a resonate instance for (const opts of [top, middle, bottom]) { expect(opts.durable).toBe(false); - expect(opts.eidFn).toBe(utils.randomId); - expect(opts.encoder).toBe(resonateOpts.encoder); - expect(opts.idempotencyKeyFn).toBe(utils.hash); expect(opts.pollFrequency).toBe(resonateOpts.pollFrequency); - expect(opts.retryPolicy).toBe(resonateOpts.retryPolicy); + expect(opts.retryPolicy).toEqual(resonateOpts.retryPolicy); expect(opts.timeout).toBe(resonateOpts.timeout); expect(opts.version).toBe(1); } @@ -77,16 +71,16 @@ describe("Options", () => { }); test("registered options propagate down", async () => { - const [top, middle, bottom] = await resonate.run<[Options, Options, Options]>("test.2", `test.2.1`); + const a = await resonate.run<[Options, Options, Options]>("test.2", `test.2.1`); + console.log({ a }); + const [top, middle, bottom] = a; for (const opts of [top, middle, bottom]) { + console.log({ opts }); expect(opts.durable).toBe(overrides.durable); - expect(opts.eidFn).toBe(overrides.eidFn); - expect(opts.encoder).toBe(overrides.encoder); - expect(opts.idempotencyKeyFn).toBe(overrides.idempotencyKeyFn); expect(opts.shouldLock).toBe(overrides.shouldLock); expect(opts.pollFrequency).toBe(overrides.pollFrequency); - expect(opts.retryPolicy).toBe(overrides.retryPolicy); + expect(opts.retryPolicy).toEqual(overrides.retryPolicy); expect(opts.timeout).toBe(overrides.timeout); expect(opts.version).toBe(overrides.version); } @@ -104,12 +98,9 @@ describe("Options", () => { // top level options expect(top.durable).toBe(false); - expect(top.eidFn).toBe(overrides.eidFn); - expect(top.encoder).toBe(resonateOpts.encoder); - expect(top.idempotencyKeyFn).toBe(overrides.idempotencyKeyFn); expect(top.shouldLock).toBe(false); expect(top.pollFrequency).toBe(resonateOpts.pollFrequency); - expect(top.retryPolicy).toBe(overrides.retryPolicy); + expect(top.retryPolicy).toEqual(overrides.retryPolicy); expect(top.tags).toEqual({ ...resonateOpts.tags, ...overrides.tags, "resonate:invocation": "true" }); expect(top.timeout).toBe(overrides.timeout); expect(top.version).toBe(overrides.version); @@ -117,12 +108,9 @@ describe("Options", () => { // bottom level options for (const opts of bottom) { expect(opts.durable).toBe(false); - expect(opts.eidFn).toBe(utils.randomId); - expect(opts.encoder).toBe(resonateOpts.encoder); - expect(opts.idempotencyKeyFn).toBe(utils.hash); expect(opts.shouldLock).toBe(false); expect(opts.pollFrequency).toBe(resonateOpts.pollFrequency); - expect(opts.retryPolicy).toBe(resonateOpts.retryPolicy); + expect(opts.retryPolicy).toEqual(resonateOpts.retryPolicy); expect(opts.tags).toEqual(resonateOpts.tags); expect(opts.timeout).toBe(resonateOpts.timeout); expect(opts.version).toBe(overrides.version); @@ -139,23 +127,17 @@ describe("Options", () => { // middle options (overriden) expect(middle.durable).toBe(overrides.durable); - expect(middle.eidFn).toBe(overrides.eidFn); - expect(middle.encoder).toBe(overrides.encoder); - expect(middle.idempotencyKeyFn).toBe(overrides.idempotencyKeyFn); expect(middle.shouldLock).toBe(overrides.shouldLock); expect(middle.pollFrequency).toBe(overrides.pollFrequency); - expect(middle.retryPolicy).toBe(overrides.retryPolicy); + expect(middle.retryPolicy).toEqual(overrides.retryPolicy); expect(middle.tags).toEqual({ ...resonateOpts.tags, ...overrides.tags }); expect(middle.timeout).toBe(overrides.timeout); // top and bottom options for (const opts of [top, bottom]) { expect(opts.durable).toBe(false); - expect(opts.eidFn).toBe(utils.randomId); - expect(opts.encoder).toBe(resonateOpts.encoder); - expect(opts.idempotencyKeyFn).toBe(utils.hash); expect(opts.pollFrequency).toBe(resonateOpts.pollFrequency); - expect(opts.retryPolicy).toBe(resonateOpts.retryPolicy); + expect(opts.retryPolicy).toEqual(resonateOpts.retryPolicy); expect(opts.timeout).toBe(resonateOpts.timeout); expect(opts.shouldLock).toBe(false); } diff --git a/test/userResources.test.ts b/test/userResources.test.ts index 8265c79..5937948 100644 --- a/test/userResources.test.ts +++ b/test/userResources.test.ts @@ -10,13 +10,12 @@ describe("User Defined Resources", () => { resonate.register("resource", async (ctx: Context, resourceVal: unknown) => { ctx.setResource("mock", resourceVal); - return ctx.getResource("mock"); + const resource = ctx.getResource("mock"); + expect(resource).toBe(resourceVal); }); const resourceVal = {}; - const handle = await resonate.invokeLocal("resource", "resource.0", resourceVal); - - await expect(handle.result()).resolves.toBe(resourceVal); + await resonate.invokeLocal("resource", "resource.0", resourceVal); }); test("Set a resource and get it deep in the context stack", async () => {