Skip to content

Commit

Permalink
Move polling of tasks into the task source and out of the store
Browse files Browse the repository at this point in the history
  • Loading branch information
avillega committed Sep 3, 2024
1 parent f5882ae commit 318d600
Show file tree
Hide file tree
Showing 15 changed files with 208 additions and 203 deletions.
9 changes: 2 additions & 7 deletions lib/core/store.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { DurablePromiseRecord } from "./promises/types";

import { Schedule } from "./schedules/types";
import { ResumeBody } from "./tasks";
import { CallbackRecord, ResumeBody } from "./tasks";

/**
* Store Interface
Expand All @@ -12,11 +12,6 @@ export interface IStore {
readonly locks: ILockStore;
readonly callbacks: ICallbackStore;
readonly tasks: ITaskStore;

/**
* Do neccesary clean up and free of resources on stop
*/
stop(): void;
}

/**
Expand Down Expand Up @@ -232,5 +227,5 @@ export interface ITaskStore {
* Callback Store API
*/
export interface ICallbackStore {
create(promiseId: string, recv: string, timeout: number, data: string | undefined): Promise<boolean>;
create(promiseId: string, recv: string, timeout: number, data: string | undefined): Promise<CallbackRecord>;
}
130 changes: 51 additions & 79 deletions lib/core/stores/local.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@ import {
isRejectedPromise,
isCanceledPromise,
isTimedoutPromise,
isCompletedPromise,
} from "../promises/types";
import { Schedule } from "../schedules/types";
import { IStorage } from "../storage";
import { MemoryStorage } from "../storages/memory";
import { WithTimeout } from "../storages/withTimeout";
import { IStore, IPromiseStore, IScheduleStore, ILockStore, ICallbackStore, ITaskStore } from "../store";
import { ResumeBody, isResumeBody } from "../tasks";
import { LocalTasksSource } from "../tasksSources/local";
import { CallbackRecord, ResumeBody, isResumeBody } from "../tasks";

export class LocalStore implements IStore {
public promises: LocalPromiseStore;
Expand All @@ -31,43 +29,23 @@ export class LocalStore implements IStore {

private toSchedule: Schedule[] = [];
private next: number | undefined = undefined;
private tasksSource: LocalTasksSource;
private callbacksTimeout: NodeJS.Timeout | undefined;

constructor(
tasksSource: LocalTasksSource,
opts: Partial<StoreOptions> = {},
promiseStorage: IStorage<DurablePromiseRecord> = new WithTimeout(new MemoryStorage<DurablePromiseRecord>()),
scheduleStorage: IStorage<Schedule> = new MemoryStorage<Schedule>(),
lockStorage: IStorage<{ id: string; eid: string }> = new MemoryStorage<{ id: string; eid: string }>(),
callbacksStorage: IStorage<{ id: string; data: string }> = new MemoryStorage<{
id: string;
data: string;
}>(),
taskStorage: IStorage<{ id: string; counter: number; data: string }> = new MemoryStorage<{
id: string;
counter: number;
data: string;
}>(),
callbacksStorage: IStorage<CallbackRecord> = new MemoryStorage<CallbackRecord>(),
) {
this.callbacks = new LocalCallbackStore(this, callbacksStorage);
this.promises = new LocalPromiseStore(this, promiseStorage);
this.schedules = new LocalScheduleStore(this, scheduleStorage);
this.locks = new LocalLockStore(this, lockStorage);
this.tasks = new LocalTaskStore(this, taskStorage);
this.tasks = new LocalTaskStore(this);

this.logger = opts.logger ?? new Logger();
this.tasksSource = tasksSource;

this.init();
this.handleCallbacks();
}

stop(): void {
clearTimeout(this.next);
this.next = undefined;
clearTimeout(this.callbacksTimeout);
this.callbacksTimeout = undefined;
}

// handler the schedule store can call
Expand Down Expand Up @@ -140,28 +118,6 @@ export class LocalStore implements IStore {
.replace("{{.id}}", schedule.id)
.replace("{{.timestamp}}", schedule.nextRunTime.toString());
}

// Handles all the callbacks
async handleCallbacks() {
clearTimeout(this.callbacksTimeout);

this.callbacksTimeout = setInterval(async () => {
const callbacksToDelete = [];
for await (const callbacks of this.callbacks.getAll()) {
for (const callback of callbacks) {
const promise = await this.promises.get(callback.id);
if (isCompletedPromise(promise)) {
const task = await this.tasks.create(promise.id, callback.data);
this.tasksSource.emitTask(task);
callbacksToDelete.push(callback);
}
}
}
for (const callback of callbacksToDelete) {
await this.callbacks.delete(callback.id);
}
}, 500);
}
}

export class LocalPromiseStore implements IPromiseStore {
Expand Down Expand Up @@ -545,67 +501,83 @@ export class LocalLockStore implements ILockStore {
export class LocalCallbackStore implements ICallbackStore {
constructor(
private store: LocalStore,
private storage: IStorage<{ id: string; data: string }>,
private storage: IStorage<CallbackRecord>,
) {}

async create(promiseId: string, recv: string, timeout: number, data: string | undefined): Promise<boolean> {
await this.storage.rmw(promiseId, (callback) => {
async create(promiseId: string, recv: string, timeout: number, data: string | undefined): Promise<CallbackRecord> {
const promise = await this.store.promises.get(promiseId);
const callbackRecord = {
callback: {
id: promiseId,
promiseId,
message: {
data: data,
recv: recv,
},
timeout: timeout,
createdOn: Date.now(),
},
promise: promise,
};

if (promise.state !== "PENDING") {
// Returns a mock callback with the resolved promise, doesn't actually creates the callback
return callbackRecord;
}

// If the promise is pending creates the callback for it.
return await this.storage.rmw(promiseId, (callback) => {
if (!callback) {
return {
id: promiseId,
data: data ?? "",
};
return callbackRecord;
}
return callback;
});
return true;
}

async get(promiseId: string): Promise<{ id: string; data: string } | undefined> {
async get(promiseId: string): Promise<CallbackRecord> {
return this.storage.rmw(promiseId, (callback) => {
if (callback) return callback;
if (!callback) {
throw new ResonateError("Not found", ErrorCodes.STORE_NOT_FOUND);
}
return callback;
});
}

getAll(): AsyncGenerator<{ id: string; data: string }[], void, unknown> {
return this.storage.all();
async getAll(): Promise<CallbackRecord[]> {
// TODO(avillega): Migrate this loop to `Array.fromAsync` once we make node 22 our minimum version
const result = [];
for await (const callbacks of this.storage.all()) {
result.push(...callbacks);
}
return result;
}

async delete(callbackId: string): Promise<boolean> {
return await this.storage.rmd(callbackId, (callback) => callback.id === callbackId);
return await this.storage.rmd(callbackId, (callback) => callback.callback.id === callbackId);
}
}

export class LocalTaskStore implements ITaskStore {
constructor(
private store: LocalStore,
private storage: IStorage<{ id: string; counter: number; data: string }>,
) {}

create(taskId: string, data: string): Promise<{ id: string; counter: number }> {
return this.storage.rmw(taskId, (task) => {
return task ? task : { id: taskId, counter: 0, data: data };
});
}
constructor(private store: LocalStore) {}

// NOTE: Just for the Local Store the taskId === callbackId which allows us to
// claim the task by getting the data from the callback
async claim(taskId: string, count: number): Promise<ResumeBody> {
const task = await this.storage.rmw(taskId, (task) => {
if (task) {
return task;
}
});
if (!task) {
const callback = await this.store.callbacks.get(taskId);
if (!callback) {
throw new ResonateError("Task not found", ErrorCodes.STORE_NOT_FOUND);
}

const resumeBody = JSON.parse(task.data);
const resumeBody = JSON.parse(callback.callback.message.data);
if (!isResumeBody(resumeBody)) {
throw new ResonateError("Invalid response", ErrorCodes.STORE_PAYLOAD, resumeBody);
}
return resumeBody;
}

complete(taskId: string, count: number): Promise<boolean> {
return this.storage.rmd(taskId, (task) => task.id === taskId);
async complete(taskId: string, count: number): Promise<boolean> {
await this.store.callbacks.delete(taskId);
return true;
}
}

Expand Down
11 changes: 3 additions & 8 deletions lib/core/stores/remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { StoreOptions } from "../options";
import { DurablePromiseRecord, isDurablePromiseRecord, isCompletedPromise } from "../promises/types";
import { Schedule, isSchedule } from "../schedules/types";
import { IStore, IPromiseStore, IScheduleStore, ILockStore, ICallbackStore, ITaskStore } from "../store";
import { ResumeBody, isResumeBody } from "../tasks";
import { CallbackRecord, ResumeBody, isCallbackRecord, isResumeBody } from "../tasks";
import * as utils from "../utils";

export class RemoteStore implements IStore {
Expand Down Expand Up @@ -52,10 +52,6 @@ export class RemoteStore implements IStore {
}
}

stop(): void {
// Intentionally nop
}

async call<T>(path: string, guard: (b: unknown) => b is T, options: RequestInit): Promise<T> {
let error: unknown;

Expand Down Expand Up @@ -566,8 +562,8 @@ export class RemoteTasksStore implements ITaskStore {
export class RemoteCallbackStore implements ICallbackStore {
constructor(private store: RemoteStore) {}

async create(promiseId: string, recv: string, timeout: number, data: string | undefined): Promise<boolean> {
await this.store.call("callbacks", (b: unknown): b is any => true, {
async create(promiseId: string, recv: string, timeout: number, data: string | undefined): Promise<CallbackRecord> {
return this.store.call("callbacks", isCallbackRecord, {
method: "POST",
body: JSON.stringify({
promiseId,
Expand All @@ -576,7 +572,6 @@ export class RemoteCallbackStore implements ICallbackStore {
data: data ? encode(data, this.store.encoder) : undefined,
}),
});
return true;
}
}

Expand Down
Loading

0 comments on commit 318d600

Please sign in to comment.