Skip to content

Commit

Permalink
wip recover
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Sergeev committed Feb 8, 2024
1 parent b284b10 commit a5d4636
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 29 deletions.
19 changes: 19 additions & 0 deletions src/test/StatedWorkflow.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -767,4 +767,23 @@ test("Template Data Change Callback with rate limit", async () => {
// wait time in rate limit function is over).
expect(counts).toEqual([0,10]);

});

test("Pulsar consumer WIP", async () => {
const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'pubsub-pulsar.yaml');
const templateYaml = fs.readFileSync(yamlFilePath, 'utf8');
let template = yaml.load(templateYaml);

const {templateProcessor: tp} = await StatedWorkflow.newWorkflow(template);
// keep steps execution logs for debugging
tp.options = {'keepLogs': true}

await tp.initialize();

while (tp.output.rebelForces.length < 4) {
await new Promise(resolve => setTimeout(resolve, 50)); // Poll every 50ms
}

expect(tp.output.rebelForces).toEqual(['chewbacca', 'luke', 'han', 'leia']);

});
80 changes: 52 additions & 28 deletions src/workflow/StatedWorkflow.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@ export class StatedWorkflow {
"publish": this.publish.bind(this),
"recover": this.recover.bind(this),
"logFunctionInvocation": this.logFunctionInvocation.bind(this),
"workflow": this.workflow.bind(this)
"workflow": this.workflow.bind(this),
"recoverTo": this.recoverTo.bind(this)
}
};
this.templateProcessor = new TemplateProcessor(template, context);
this.templateProcessor.functionGenerators.set("serial", this.serialGenerator.bind(this));
this.templateProcessor.functionGenerators.set("parallel", this.parallelGenerator.bind(this));
this.templateProcessor.functionGenerators.set("recover", this.recoverGenerator.bind(this));
// this.templateProcessor.functionGenerators.set("subscribe", this.subscribeGenerator.bind(this));
this.templateProcessor.logLevel = logLevel.ERROR; //log level must be ERROR by default. Do not commit code that sets this to DEBUG as a default
}

Expand Down Expand Up @@ -122,27 +124,6 @@ export class StatedWorkflow {
}
}

async subscribe(subscribeOptions) {
const {source} = subscribeOptions;
this.logger.debug(`subscribing ${StatedREPL.stringify(source)}`);

if(!this.workflowDispatcher) {
this.workflowDispatcher = new WorkflowDispatcher(subscribeOptions);
this.templateProcessor.onInitialize = this.workflowDispatcher.clear.bind(this.workflowDispatcher); //must remove all subscribers when template reinitialized
}


if (source === 'http') {
return StatedWorkflow.onHttp(subscribeOptions);
}
if (source === 'cloudEvent') {
return this.subscribeCloudEvent(subscribeOptions);
}
if (!source) {
throw new Error("Subscribe source not set");
}
throw new Error(`Unknown subscribe source ${source}`);
}

static ensureClient(params) {
if (!params || params.type == 'pulsar') {
Expand Down Expand Up @@ -272,6 +253,39 @@ export class StatedWorkflow {

}

async subscribeGenerator(metaInf, tp) {
return async (subscribeOptions) => {

const resolvedJsonPointers = await TemplateUtils.resolveEachStepToOneLocationInTemplate(metaInf, tp, 'subscribe'); //fixme todo we should avoid doing this for every jsonata evaluation
TemplateUtils.validateStepPointers(resolvedJsonPointers, steps, metaInf, 'subscribe');
const resolvedJsonPointers2 = await TemplateUtils.resolveEachStepToOneLocationInTemplate(metaInf, tp, 'subscribeParams');

return this.subscribe(subscribeOptions, context, resolvedJsonPointers, tp);
}
}

async subscribe(subscribeOptions) {
const {source} = subscribeOptions;
this.logger.debug(`subscribing ${StatedREPL.stringify(source)}`);

if(!this.workflowDispatcher) {
this.workflowDispatcher = new WorkflowDispatcher(subscribeOptions);
this.templateProcessor.onInitialize = this.workflowDispatcher.clear.bind(this.workflowDispatcher); //must remove all subscribers when template reinitialized
}


if (source === 'http') {
return StatedWorkflow.onHttp(subscribeOptions);
}
if (source === 'cloudEvent') {
return this.subscribeCloudEvent(subscribeOptions);
}
if (!source) {
throw new Error("Subscribe source not set");
}
throw new Error(`Unknown subscribe source ${source}`);
}

subscribePulsar(subscriptionParams) {
const {type, initialPosition = 'earliest', maxConsume = -1} = subscriptionParams;
this.logger.debug(`pulsar subscribe params ${StatedREPL.stringify(subscriptionParams)}`);
Expand All @@ -295,7 +309,7 @@ export class StatedWorkflow {
let countdown = maxConsume;
let resolve;
this.templateProcessor.setDataChangeCallback('/', async (data, jsonPtr, removed) => {
if (jsonPtr === '/step0/log/*/args') { //TODO: regexify
if (jsonPtr === '/joinResistanceStep/log/*/args') { //TODO: regexify
// TODO: await persist the step
resolve();
}
Expand Down Expand Up @@ -445,14 +459,20 @@ export class StatedWorkflow {
workflowInvocation = StatedWorkflow.generateDateAndTimeBasedID();
}

if (input === '__recover__' && stepJsons?.[0]) {
const step = new Step(stepJsons[0], StatedWorkflow.persistence, resolvedJsonPointers?.[0], tp);
for (let workflowInvocation of step.log.getInvocations()){
await this.serial(undefined, stepJsons, {workflowInvocation}, resolvedJsonPointers, tp);
}
return;
}

let currentInput = input;
const steps = [];
for (let i = 0; i < stepJsons.length; i++) {
if(currentInput !== undefined) {
const step = new Step(stepJsons[i], StatedWorkflow.persistence, resolvedJsonPointers?.[i], tp);
steps.push(step);
currentInput = await this.runStep(workflowInvocation, step, currentInput);
}
const step = new Step(stepJsons[i], StatedWorkflow.persistence, resolvedJsonPointers?.[i], tp);
steps.push(step);
currentInput = await this.runStep(workflowInvocation, step, currentInput);
}

if (!tp.options.keepLogs) await StatedWorkflow.deleteStepsLogs(workflowInvocation, steps);
Expand Down Expand Up @@ -545,6 +565,10 @@ export class StatedWorkflow {
}
}

async recoverTo(to) {
return await to('__recover__');
}

async runStep(workflowInvocation, step, input){

const {instruction, event:loggedEvent} = step.log.getCourseOfAction(workflowInvocation);
Expand Down
38 changes: 37 additions & 1 deletion src/workflow/WorkflowDispatcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.
import StatedREPL from "stated-js/dist/src/StatedREPL.js";
import {StatedWorkflow} from "./StatedWorkflow.js";
import Step from "./Step.js";

// This class is used to add events to a queue and dispatch them to one or more subscribed workflow function with the
// given parallelism. Tracks the number of active events and the number of events in the queue.
Expand Down Expand Up @@ -93,6 +94,7 @@ export class WorkflowDispatcher {
const eventData = this.queue.shift();

let promise;
// WIP check
if (this.workflowFunction && this.workflowFunction.function) {
promise = this._runStep(this.workflowFunction, eventData);
} else {
Expand All @@ -117,14 +119,48 @@ export class WorkflowDispatcher {
}
}

_runStep(stepJson, input) {
// WIP method
async _runStep(stepJson, input) {
let workflowInvocation;

if (workflowInvocation === undefined) {
workflowInvocation = StatedWorkflow.generateDateAndTimeBasedID();
}

const step = new Step(stepJson, StatedWorkflow.persistence, resolvedJsonPointers?.[i], tp);

const {instruction, event:loggedEvent} = step.log.getCourseOfAction(workflowInvocation);
if(instruction === "START"){
return await step.run(workflowInvocation, input);
}else if (instruction === "RESTART"){
return await step.run(workflowInvocation, loggedEvent.args);
} else if(instruction === "SKIP"){
return loggedEvent.out;
}else{
throw new Error(`unknown courseOfAction: ${instruction}`);
}

const promise = this.runStep(workflowInvocation, step, currentInput);

return promise;
}


// TODO: the runStep logic should be moved to a Step class
async runStep(workflowInvocation, step, input){

const {instruction, event:loggedEvent} = step.log.getCourseOfAction(workflowInvocation);
if(instruction === "START"){
return await step.run(workflowInvocation, input);
}else if (instruction === "RESTART"){
return await step.run(workflowInvocation, loggedEvent.args);
} else if(instruction === "SKIP"){
return loggedEvent.out;
}else{
throw new Error(`unknown courseOfAction: ${instruction}`);
}
}

addToQueue(data) {
this.queue.push(data);
this._dispatch();
Expand Down

0 comments on commit a5d4636

Please sign in to comment.