diff --git a/lib/core/promises/promises.ts b/lib/core/promises/promises.ts index ee23095..f9f1ae5 100644 --- a/lib/core/promises/promises.ts +++ b/lib/core/promises/promises.ts @@ -1,7 +1,7 @@ import { IEncoder } from "../encoder"; import { ErrorCodes, ResonateError } from "../errors"; import { IPromiseStore } from "../store"; -import { PendingPromise, ResolvedPromise, RejectedPromise, CanceledPromise, TimedoutPromise } from "./types"; +import { DurablePromiseRecord } from "./types"; /** * Durable Promise create options. @@ -69,7 +69,7 @@ export class DurablePromise { constructor( private store: IPromiseStore, private encoder: IEncoder, - private promise: PendingPromise | ResolvedPromise | RejectedPromise | CanceledPromise | TimedoutPromise, + private promise: DurablePromiseRecord, ) { this.completed = new Promise((resolve) => { this.complete = resolve; diff --git a/lib/core/promises/types.ts b/lib/core/promises/types.ts index 02af121..b816cf1 100644 --- a/lib/core/promises/types.ts +++ b/lib/core/promises/types.ts @@ -1,45 +1,5 @@ -export type DurablePromise = PendingPromise | ResolvedPromise | RejectedPromise | CanceledPromise | TimedoutPromise; - -export type PendingPromise = { - state: "PENDING"; - id: string; - timeout: number; - param: { - headers: Record | undefined; - data: string | undefined; - }; - value: { - headers: undefined; - data: undefined; - }; - createdOn: number; - completedOn: undefined; - idempotencyKeyForCreate: string | undefined; - idempotencyKeyForComplete: undefined; - tags: Record | undefined; -}; - -export type ResolvedPromise = { - state: "RESOLVED"; - id: string; - timeout: number; - param: { - headers: Record | undefined; - data: string | undefined; - }; - value: { - headers: Record | undefined; - data: string | undefined; - }; - createdOn: number; - completedOn: number; - idempotencyKeyForCreate: string | undefined; - idempotencyKeyForComplete: string | undefined; - tags: Record | undefined; -}; - -export type RejectedPromise = { - state: "REJECTED"; +export type DurablePromiseRecord = { + state: "PENDING" | "RESOLVED" | "REJECTED" | "REJECTED_CANCELED" | "REJECTED_TIMEDOUT"; id: string; timeout: number; param: { @@ -51,53 +11,14 @@ export type RejectedPromise = { data: string | undefined; }; createdOn: number; - completedOn: number; + completedOn: number | undefined; idempotencyKeyForCreate: string | undefined; idempotencyKeyForComplete: string | undefined; tags: Record | undefined; }; -export type CanceledPromise = { - state: "REJECTED_CANCELED"; - id: string; - timeout: number; - param: { - headers: Record | undefined; - data: string | undefined; - }; - value: { - headers: Record | undefined; - data: string | undefined; - }; - createdOn: number; - completedOn: number; - idempotencyKeyForCreate: string | undefined; - idempotencyKeyForComplete: string | undefined; - tags: Record | undefined; -}; - -export type TimedoutPromise = { - state: "REJECTED_TIMEDOUT"; - id: string; - timeout: number; - param: { - headers: Record | undefined; - data: string | undefined; - }; - value: { - headers: undefined; - data: undefined; - }; - createdOn: number; - completedOn: number; - idempotencyKeyForCreate: string | undefined; - idempotencyKeyForComplete: undefined; - tags: Record | undefined; -}; - -// Type guards - -export function isDurablePromise(p: unknown): p is DurablePromise { +// This is an unsound type guard, we should be more strict in what we call a DurablePromise +export function isDurablePromiseRecord(p: unknown): p is DurablePromiseRecord { return ( p !== null && typeof p === "object" && @@ -107,28 +28,26 @@ export function isDurablePromise(p: unknown): p is DurablePromise { ); } -export function isPendingPromise(p: unknown): p is PendingPromise { - return isDurablePromise(p) && p.state === "PENDING"; +export function isPendingPromise(p: DurablePromiseRecord): boolean { + return p.state === "PENDING"; } -export function isResolvedPromise(p: unknown): p is ResolvedPromise { - return isDurablePromise(p) && p.state === "RESOLVED"; +export function isResolvedPromise(p: DurablePromiseRecord): boolean { + return p.state === "RESOLVED"; } -export function isRejectedPromise(p: unknown): p is RejectedPromise { - return isDurablePromise(p) && p.state === "REJECTED"; +export function isRejectedPromise(p: DurablePromiseRecord): boolean { + return p.state === "REJECTED"; } -export function isCanceledPromise(p: unknown): p is CanceledPromise { - return isDurablePromise(p) && p.state === "REJECTED_CANCELED"; +export function isCanceledPromise(p: DurablePromiseRecord): boolean { + return p.state === "REJECTED_CANCELED"; } -export function isTimedoutPromise(p: unknown): p is TimedoutPromise { - return isDurablePromise(p) && p.state === "REJECTED_TIMEDOUT"; +export function isTimedoutPromise(p: DurablePromiseRecord): boolean { + return p.state === "REJECTED_TIMEDOUT"; } -export function isCompletedPromise( - p: unknown, -): p is ResolvedPromise | RejectedPromise | CanceledPromise | TimedoutPromise { - return isDurablePromise(p) && ["RESOLVED", "REJECTED", "REJECTED_CANCELED", "REJECTED_TIMEDOUT"].includes(p.state); +export function isCompletedPromise(p: DurablePromiseRecord): boolean { + return ["RESOLVED", "REJECTED", "REJECTED_CANCELED", "REJECTED_TIMEDOUT"].includes(p.state); } diff --git a/lib/core/storages/memory.ts b/lib/core/storages/memory.ts index a33b4b6..d49603f 100644 --- a/lib/core/storages/memory.ts +++ b/lib/core/storages/memory.ts @@ -3,6 +3,7 @@ import { IStorage } from "../storage"; export class MemoryStorage implements IStorage { private items: Record = {}; + // read-modify-write async rmw(id: string, func: (item: T | undefined) => X): Promise { const item = func(this.items[id]); if (item) { @@ -12,6 +13,7 @@ export class MemoryStorage implements IStorage { return item; } + // read-modify-delete async rmd(id: string, func: (item: T) => boolean): Promise { const item = this.items[id]; let result = false; diff --git a/lib/core/storages/withTimeout.ts b/lib/core/storages/withTimeout.ts index ff61a5e..063777c 100644 --- a/lib/core/storages/withTimeout.ts +++ b/lib/core/storages/withTimeout.ts @@ -1,28 +1,32 @@ -import { DurablePromise, ResolvedPromise, TimedoutPromise, isPendingPromise } from "../promises/types"; +import { DurablePromiseRecord, isPendingPromise } from "../promises/types"; import { IStorage } from "../storage"; import { MemoryStorage } from "./memory"; -export class WithTimeout implements IStorage { - constructor(private storage: IStorage = new MemoryStorage()) {} +export class WithTimeout implements IStorage { + constructor(private storage: IStorage = new MemoryStorage()) {} - rmw(id: string, func: (item: DurablePromise | undefined) => T): Promise { + rmw( + id: string, + func: (item: DurablePromiseRecord | undefined) => T, + ): Promise { return this.storage.rmw(id, (p) => func(p ? timeout(p) : undefined)); } - rmd(id: string, func: (item: DurablePromise) => boolean): Promise { + rmd(id: string, func: (item: DurablePromiseRecord) => boolean): Promise { return this.storage.rmd(id, (p) => func(timeout(p))); } - async *all(): AsyncGenerator { + async *all(): AsyncGenerator { for await (const promises of this.storage.all()) { yield promises.map(timeout); } } } -function timeout(promise: T): T | ResolvedPromise | TimedoutPromise { +function timeout(promise: T): DurablePromiseRecord { if (isPendingPromise(promise) && Date.now() >= promise.timeout) { - const body = { + return { + state: promise.tags?.["resonate:timeout"] === "true" ? "RESOLVED" : "REJECTED_TIMEDOUT", id: promise.id, timeout: promise.timeout, param: promise.param, @@ -36,18 +40,6 @@ function timeout(promise: T): T | ResolvedPromise | Ti idempotencyKeyForComplete: undefined, tags: promise.tags, }; - - if (promise.tags?.["resonate:timeout"] === "true") { - return { - state: "RESOLVED", - ...body, - }; - } else { - return { - state: "REJECTED_TIMEDOUT", - ...body, - }; - } } return promise; diff --git a/lib/core/store.ts b/lib/core/store.ts index 0a8de6f..51745a2 100644 --- a/lib/core/store.ts +++ b/lib/core/store.ts @@ -1,11 +1,4 @@ -import { - DurablePromise, - PendingPromise, - ResolvedPromise, - RejectedPromise, - CanceledPromise, - TimedoutPromise, -} from "./promises/types"; +import { DurablePromiseRecord } from "./promises/types"; import { Schedule } from "./schedules/types"; @@ -42,7 +35,7 @@ export interface IPromiseStore { data: string | undefined, timeout: number, tags: Record | undefined, - ): Promise; + ): Promise; /** * Cancels a new promise. @@ -60,7 +53,7 @@ export interface IPromiseStore { strict: boolean, headers: Record | undefined, data: string | undefined, - ): Promise; + ): Promise; /** * Resolves a promise. @@ -78,7 +71,7 @@ export interface IPromiseStore { strict: boolean, headers: Record | undefined, data: string | undefined, - ): Promise; + ): Promise; /** * Rejects a promise @@ -96,7 +89,7 @@ export interface IPromiseStore { strict: boolean, headers: Record | undefined, data: string | undefined, - ): Promise; + ): Promise; /** * Retrieves a promise based on its id. @@ -104,7 +97,7 @@ export interface IPromiseStore { * @param id Unique identifier for the promise to be retrieved. * @returns A durable promise that is pending, canceled, resolved, or rejected. */ - get(id: string): Promise; + get(id: string): Promise; /** * Search for promises. @@ -120,7 +113,7 @@ export interface IPromiseStore { state: string | undefined, tags: Record | undefined, limit?: number, - ): AsyncGenerator; + ): AsyncGenerator; } /** diff --git a/lib/core/stores/local.ts b/lib/core/stores/local.ts index 7b9d3b8..71570c1 100644 --- a/lib/core/stores/local.ts +++ b/lib/core/stores/local.ts @@ -4,12 +4,7 @@ import { ILogger } from "../logger"; import { Logger } from "../loggers/logger"; import { StoreOptions } from "../options"; import { - DurablePromise, - PendingPromise, - ResolvedPromise, - RejectedPromise, - CanceledPromise, - TimedoutPromise, + DurablePromiseRecord, isPendingPromise, isResolvedPromise, isRejectedPromise, @@ -34,7 +29,7 @@ export class LocalStore implements IStore { constructor( opts: Partial = {}, - promiseStorage: IStorage = new WithTimeout(new MemoryStorage()), + promiseStorage: IStorage = new WithTimeout(new MemoryStorage()), scheduleStorage: IStorage = new MemoryStorage(), lockStorage: IStorage<{ id: string; eid: string }> = new MemoryStorage<{ id: string; eid: string }>(), ) { @@ -122,7 +117,7 @@ export class LocalStore implements IStore { export class LocalPromiseStore implements IPromiseStore { constructor( private store: LocalStore, - private storage: IStorage, + private storage: IStorage, ) {} async create( @@ -133,7 +128,7 @@ export class LocalPromiseStore implements IPromiseStore { data: string | undefined, timeout: number, tags: Record | undefined, - ): Promise { + ): Promise { return this.storage.rmw(id, (promise) => { if (!promise) { return { @@ -174,7 +169,7 @@ export class LocalPromiseStore implements IPromiseStore { strict: boolean, headers: Record | undefined, data: string | undefined, - ): Promise { + ): Promise { return this.storage.rmw(id, (promise) => { if (!promise) { throw new ResonateError("Not found", ErrorCodes.STORE_NOT_FOUND); @@ -219,7 +214,7 @@ export class LocalPromiseStore implements IPromiseStore { strict: boolean, headers: Record | undefined, data: string | undefined, - ): Promise { + ): Promise { return this.storage.rmw(id, (promise) => { if (!promise) { throw new ResonateError("Not found", ErrorCodes.STORE_NOT_FOUND); @@ -264,7 +259,7 @@ export class LocalPromiseStore implements IPromiseStore { strict: boolean, headers: Record | undefined, data: string | undefined, - ): Promise { + ): Promise { return this.storage.rmw(id, (promise) => { if (!promise) { throw new ResonateError("Not found", ErrorCodes.STORE_NOT_FOUND); @@ -303,7 +298,7 @@ export class LocalPromiseStore implements IPromiseStore { }); } - async get(id: string): Promise { + async get(id: string): Promise { const promise = await this.storage.rmw(id, (p) => p); if (!promise) { @@ -318,7 +313,7 @@ export class LocalPromiseStore implements IPromiseStore { state?: string, tags?: Record, limit?: number, - ): AsyncGenerator { + ): AsyncGenerator { // filter the promises returned from all storage const regex = new RegExp(id.replaceAll("*", ".*")); const states = searchStates(state); diff --git a/lib/core/stores/remote.ts b/lib/core/stores/remote.ts index 863a18d..aa49463 100644 --- a/lib/core/stores/remote.ts +++ b/lib/core/stores/remote.ts @@ -4,16 +4,7 @@ import { ErrorCodes, ResonateError } from "../errors"; import { ILogger } from "../logger"; import { Logger } from "../loggers/logger"; import { StoreOptions } from "../options"; -import { - DurablePromise, - PendingPromise, - ResolvedPromise, - RejectedPromise, - CanceledPromise, - TimedoutPromise, - isDurablePromise, - isCompletedPromise, -} from "../promises/types"; +import { DurablePromiseRecord, isDurablePromiseRecord, isCompletedPromise } from "../promises/types"; import { Schedule, isSchedule } from "../schedules/types"; import { IStore, IPromiseStore, IScheduleStore, ILockStore } from "../store"; import * as utils from "../utils"; @@ -125,7 +116,7 @@ export class RemotePromiseStore implements IPromiseStore { data: string | undefined, timeout: number, tags: Record | undefined, - ): Promise { + ): Promise { const reqHeaders: Record = { Strict: JSON.stringify(strict), }; @@ -134,7 +125,7 @@ export class RemotePromiseStore implements IPromiseStore { reqHeaders["Idempotency-Key"] = ikey; } - const promise = await this.store.call("promises", isDurablePromise, { + const promise = await this.store.call("promises", isDurablePromiseRecord, { method: "POST", headers: reqHeaders, body: JSON.stringify({ @@ -157,7 +148,7 @@ export class RemotePromiseStore implements IPromiseStore { strict: boolean, headers: Record | undefined, data: string | undefined, - ): Promise { + ): Promise { const reqHeaders: Record = { Strict: JSON.stringify(strict), }; @@ -166,7 +157,7 @@ export class RemotePromiseStore implements IPromiseStore { reqHeaders["Idempotency-Key"] = ikey; } - const promise = await this.store.call(`promises/${id}`, isCompletedPromise, { + const promise = await this.store.call(`promises/${id}`, isDurablePromiseRecord, { method: "PATCH", headers: reqHeaders, body: JSON.stringify({ @@ -178,6 +169,10 @@ export class RemotePromiseStore implements IPromiseStore { }), }); + if (!isCompletedPromise(promise)) { + throw new ResonateError("Invalid response", ErrorCodes.STORE_PAYLOAD, promise); + } + return decode(promise, this.store.encoder); } @@ -187,7 +182,7 @@ export class RemotePromiseStore implements IPromiseStore { strict: boolean, headers: Record | undefined, data: string | undefined, - ): Promise { + ): Promise { const reqHeaders: Record = { Strict: JSON.stringify(strict), }; @@ -196,7 +191,7 @@ export class RemotePromiseStore implements IPromiseStore { reqHeaders["Idempotency-Key"] = ikey; } - const promise = await this.store.call(`promises/${id}`, isCompletedPromise, { + const promise = await this.store.call(`promises/${id}`, isDurablePromiseRecord, { method: "PATCH", headers: reqHeaders, body: JSON.stringify({ @@ -208,6 +203,10 @@ export class RemotePromiseStore implements IPromiseStore { }), }); + if (!isCompletedPromise(promise)) { + throw new ResonateError("Invalid response", ErrorCodes.STORE_PAYLOAD, promise); + } + return decode(promise, this.store.encoder); } @@ -217,7 +216,7 @@ export class RemotePromiseStore implements IPromiseStore { strict: boolean, headers: Record | undefined, data: string | undefined, - ): Promise { + ): Promise { const reqHeaders: Record = { Strict: JSON.stringify(strict), }; @@ -226,7 +225,7 @@ export class RemotePromiseStore implements IPromiseStore { reqHeaders["Idempotency-Key"] = ikey; } - const promise = await this.store.call(`promises/${id}`, isCompletedPromise, { + const promise = await this.store.call(`promises/${id}`, isDurablePromiseRecord, { method: "PATCH", headers: reqHeaders, body: JSON.stringify({ @@ -238,11 +237,15 @@ export class RemotePromiseStore implements IPromiseStore { }), }); + if (!isCompletedPromise(promise)) { + throw new ResonateError("Invalid response", ErrorCodes.STORE_PAYLOAD, promise); + } + return decode(promise, this.store.encoder); } - async get(id: string): Promise { - const promise = await this.store.call(`promises/${id}`, isDurablePromise, { + async get(id: string): Promise { + const promise = await this.store.call(`promises/${id}`, isDurablePromiseRecord, { method: "GET", }); @@ -254,7 +257,7 @@ export class RemotePromiseStore implements IPromiseStore { state: string | undefined, tags: Record | undefined, limit: number | undefined, - ): AsyncGenerator { + ): AsyncGenerator { let cursor: string | null | undefined = undefined; while (cursor !== null) { @@ -472,7 +475,7 @@ function encode(value: string, encoder: IEncoder): string { } } -function decode

(promise: P, encoder: IEncoder): P { +function decode

(promise: P, encoder: IEncoder): P { try { if (promise.param?.data) { promise.param.data = encoder.decode(promise.param.data); @@ -490,7 +493,7 @@ function decode

(promise: P, encoder: IEncoder