diff --git a/package.json b/package.json index 191591a4..e2c701f8 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "stated-js", - "version": "0.1.50", + "version": "0.1.51", "license": "Apache-2.0", "description": "JSONata embedded in JSON", "main": "./dist/src/index.js", diff --git a/src/ParallelExecutionPlanDefault.ts b/src/ParallelExecutionPlanDefault.ts index 1c18b471..6523f5c8 100644 --- a/src/ParallelExecutionPlanDefault.ts +++ b/src/ParallelExecutionPlanDefault.ts @@ -16,6 +16,8 @@ export class ParallelExecutionPlanDefault implements ParallelExecutionPlan { jsonPtr: JsonPointerString = "/"; didUpdate: boolean = false; restore?: boolean = false; + circular?:boolean; + constructor(tp: TemplateProcessor, parallelSteps: ParallelExecutionPlan[] = [], vals?: Partial | null) { this.output = tp.output; this.parallel = parallelSteps; @@ -38,6 +40,9 @@ export class ParallelExecutionPlanDefault implements ParallelExecutionPlan { if (p.data) { (json as any).data = p.data; } + if(p.circular){ + (json as any).circular = p.circular + } return json; } @@ -46,7 +51,7 @@ export class ParallelExecutionPlanDefault implements ParallelExecutionPlan { } cleanCopy(tp: TemplateProcessor, source: ParallelExecutionPlanDefault = this): ParallelExecutionPlanDefault { - return new ParallelExecutionPlanDefault(tp, [], { + const fields:any = { op: source.op, parallel: source.parallel.map(p => source.cleanCopy(tp, p as any)), completed: false, @@ -55,7 +60,11 @@ export class ParallelExecutionPlanDefault implements ParallelExecutionPlan { forkId: "ROOT", didUpdate: false, data: source.data - }); + }; + if(source.circular){ + fields.circular = source.circular; + } + return new ParallelExecutionPlanDefault(tp, [], fields); } getNodeList(all: boolean = false): JsonPointerString[] { diff --git a/src/ParallelPlanner.ts b/src/ParallelPlanner.ts index 436d1128..99235ce6 100644 --- a/src/ParallelPlanner.ts +++ b/src/ParallelPlanner.ts @@ -264,7 +264,7 @@ export class ParallelPlanner implements Planner{ return leaves; } - + async execute(plan: ExecutionPlan): Promise{ //console.log(`executing ${stringifyTemplateJSON(plan)}`); @@ -279,76 +279,83 @@ export class ParallelPlanner implements Planner{ * @param step */ const _execute = async (step: ParallelExecutionPlan): Promise => { - const { jsonPtr, op } = step; + const { jsonPtr, op, circular=false } = step; + + if(circular){ //if step is marked as circular, evaluate it's expression and bail without following its reference + step.didUpdate = await this.tp.evaluateNode(step); + this.tp.logger.debug(`execute: circular, abort ${step.jsonPtr}`); + return step; + } // Check if a Promise already exists for this jsonPtr (mutation is put in the map first since it is the root of the plan, so will be found by the leaves that depend on it) if (promises.has(jsonPtr)) { + this.tp.logger.debug(`execute: waiting ${step.jsonPtr}`); const promise = promises.get(jsonPtr)!; //don't freak out ... '!' is TS non-null assertion //return a 'pointer' to the cached plan, or else we create loops with lead nodes in a mutation plan pointing back to the root of the //plan that holds the mutation return promise.then(plan=>{ return (plan as ParallelExecutionPlanDefault).getPointer(this.tp); - }); + }); } - // Create a placeholder Promise immediately and store it in the map - const placeholderPromise: Promise = new Promise((resolve, reject) => { - promises.set( - jsonPtr, - (async () => { - try { - step.output = plan.output; - step.forkId = plan.forkId; - step.forkStack = plan.forkStack; - //await all dependencies ...and replace the parallel array with the executed array, since the - //`promises` Map has caused already executed subtrees to be replaces with their cache-normalized - //equivalent - step.parallel = await Promise.all( - step.parallel.map((d) => { - const executed = _execute(d); - return executed - }) - ); - //if we are initializing the node, or of it had dependencies that changed, then we - //need to run the step - if( plan.op === "initialize" || step.parallel.some((step) => step.didUpdate || step.completed)){ - if(!step.completed){ //fast forward past completed steps - try { - step.didUpdate = await this.tp.evaluateNode(step); - }catch(error){ - throw error; - } - } - }else { //if we are here then we are not initializing a node, but reacting to some mutation. - //and it is possible that the node itself is originally a non-materialized node, that - //has now become materialized because it is inside/within a subtree set by a mutation - const _plan = plan as ParallelExecutionPlanDefault; - const insideMutation = this.stepIsDescendantOfMutationTarget(step, _plan ); - if(insideMutation){ - const theMutation = promises.get(_plan.jsonPtr); - if(!theMutation){ - throw new Error(`failed to retrieve mutation from cache for ${_plan.jsonPtr}`); - } - const mutationCausedAChange= (await theMutation).didUpdate; - if(mutationCausedAChange){ - const _didUpdate= await this.tp.evaluateNode(step); - step.didUpdate = _didUpdate; - } - } + let resolve:any, reject:any; + const placeHolderPromise:Promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + + const executor = async () => { + try { + step.output = plan.output; + step.forkId = plan.forkId; + step.forkStack = plan.forkStack; + //await all dependencies ...and replace the parallel array with the executed array, since the + //`promises` Map has caused already executed subtrees to be replaces with their cache-normalized + //equivalent + step.parallel = await Promise.all( + step.parallel.map((d) => { + const executed = _execute(d); + return executed + }) + ); + //if we are initializing the node, or of it had dependencies that changed, then we + //need to run the step + if( plan.op === "initialize" || step.parallel.some((step) => step.didUpdate || step.completed)){ + if(!step.completed){ //fast forward past completed steps + this.tp.logger.debug(`execute: evaluate ${step.jsonPtr}`); + step.didUpdate = await this.tp.evaluateNode(step); + } + }else { //if we are here then we are not initializing a node, but reacting to some mutation. + //and it is possible that the node itself is originally a non-materialized node, that + //has now become materialized because it is inside/within a subtree set by a mutation + const _plan = plan as ParallelExecutionPlanDefault; + const insideMutation = this.stepIsDescendantOfMutationTarget(step, _plan ); + if(insideMutation){ + const theMutation = promises.get(_plan.jsonPtr); + if(!theMutation){ + throw new Error(`failed to retrieve mutation from cache for ${_plan.jsonPtr}`); + } + const mutationCausedAChange= (await theMutation).didUpdate; + if(mutationCausedAChange){ + this.tp.logger.debug(`execute: evaluate ${step.jsonPtr}`); + const _didUpdate= await this.tp.evaluateNode(step); + step.didUpdate = _didUpdate; } - step.completed = true; - resolve(step); - } catch (error) { - promises.delete(jsonPtr); // Clean up failed promise - reject(error); } - })().then(() => {//IIFE returns void, so we can't take it as parameter to then - return step as ParallelExecutionPlan; //return the step from the closure - }) // Explicitly return a ParallelExecutionPlan becuase IIFE does not return a known type - ); - }); + } + step.completed = true; + resolve(step); + } catch (error) { + promises.delete(jsonPtr); // Clean up failed promise + reject(error); + } + }; + + promises.set(jsonPtr, placeHolderPromise) + executor(); //it is critical that the placeholder be placed in the promises map before we begin executing + - return placeholderPromise; + return placeHolderPromise; }; //end _execute try { //mutation plan diff --git a/src/TemplateProcessor.ts b/src/TemplateProcessor.ts index a1f7dad8..9444f48c 100644 --- a/src/TemplateProcessor.ts +++ b/src/TemplateProcessor.ts @@ -76,7 +76,8 @@ export type PlanStep = { output:object, forkStack:Fork[], forkId:string, - didUpdate:boolean + didUpdate:boolean, + circular?:boolean } export type Mutation = {jsonPtr:JsonPointerString, op:Op, data:any}; diff --git a/src/TraversalState.ts b/src/TraversalState.ts index 1fcb4887..24c4dba9 100644 --- a/src/TraversalState.ts +++ b/src/TraversalState.ts @@ -21,6 +21,7 @@ export class TraversalState { const e = '🔃 Circular dependency ' + this.stack.map(n => n.jsonPtr).join(' → ') + " → " + jsonPtr; this.tp.warnings.push(e); this.tp.logger.log('warn', e); + node.circular = true; //flag this step as circular so that during plan execution we can drop circular steps return false; } if (this.stack[this.stack.length - 1]?.op === "noop") { diff --git a/src/test/TemplateProcessor.test.js b/src/test/TemplateProcessor.test.js index b21603a9..467f995d 100644 --- a/src/test/TemplateProcessor.test.js +++ b/src/test/TemplateProcessor.test.js @@ -5491,6 +5491,7 @@ test("total cost example", async () => { "op": "initialize", "parallel": [ { + "circular": true, "completed": false, "didUpdate": false, "forkId": "ROOT", @@ -5510,6 +5511,7 @@ test("total cost example", async () => { "op": "initialize", "parallel": [ { + "circular": true, "completed": false, "didUpdate": false, "forkId": "ROOT", @@ -5529,6 +5531,7 @@ test("total cost example", async () => { "op": "initialize", "parallel": [ { + "circular": true, "completed": false, "didUpdate": false, "forkId": "ROOT",