Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve execute to eliminate race conditions #99

Merged
merged 2 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "stated-js",
"version": "0.1.50",
"version": "0.1.51",
"license": "Apache-2.0",
"description": "JSONata embedded in JSON",
"main": "./dist/src/index.js",
Expand Down
13 changes: 11 additions & 2 deletions src/ParallelExecutionPlanDefault.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ export class ParallelExecutionPlanDefault implements ParallelExecutionPlan {
jsonPtr: JsonPointerString = "/";
didUpdate: boolean = false;
restore?: boolean = false;
circular?:boolean;

constructor(tp: TemplateProcessor, parallelSteps: ParallelExecutionPlan[] = [], vals?: Partial<ParallelExecutionPlan> | null) {
this.output = tp.output;
this.parallel = parallelSteps;
Expand All @@ -38,6 +40,9 @@ export class ParallelExecutionPlanDefault implements ParallelExecutionPlan {
if (p.data) {
(json as any).data = p.data;
}
if(p.circular){
(json as any).circular = p.circular
}
return json;
}

Expand All @@ -46,7 +51,7 @@ export class ParallelExecutionPlanDefault implements ParallelExecutionPlan {
}

cleanCopy(tp: TemplateProcessor, source: ParallelExecutionPlanDefault = this): ParallelExecutionPlanDefault {
return new ParallelExecutionPlanDefault(tp, [], {
const fields:any = {
op: source.op,
parallel: source.parallel.map(p => source.cleanCopy(tp, p as any)),
completed: false,
Expand All @@ -55,7 +60,11 @@ export class ParallelExecutionPlanDefault implements ParallelExecutionPlan {
forkId: "ROOT",
didUpdate: false,
data: source.data
});
};
if(source.circular){
fields.circular = source.circular;
}
return new ParallelExecutionPlanDefault(tp, [], fields);
}

getNodeList(all: boolean = false): JsonPointerString[] {
Expand Down
123 changes: 65 additions & 58 deletions src/ParallelPlanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ export class ParallelPlanner implements Planner{
return leaves;
}



async execute(plan: ExecutionPlan): Promise<void>{
//console.log(`executing ${stringifyTemplateJSON(plan)}`);
Expand All @@ -279,76 +279,83 @@ export class ParallelPlanner implements Planner{
* @param step
*/
const _execute = async (step: ParallelExecutionPlan): Promise<ParallelExecutionPlan> => {
const { jsonPtr, op } = step;
const { jsonPtr, op, circular=false } = step;

if(circular){ //if step is marked as circular, evaluate it's expression and bail without following its reference
step.didUpdate = await this.tp.evaluateNode(step);
this.tp.logger.debug(`execute: circular, abort ${step.jsonPtr}`);
return step;
}

// Check if a Promise already exists for this jsonPtr (mutation is put in the map first since it is the root of the plan, so will be found by the leaves that depend on it)
if (promises.has(jsonPtr)) {
this.tp.logger.debug(`execute: waiting ${step.jsonPtr}`);
const promise = promises.get(jsonPtr)!; //don't freak out ... '!' is TS non-null assertion
//return a 'pointer' to the cached plan, or else we create loops with lead nodes in a mutation plan pointing back to the root of the
//plan that holds the mutation
return promise.then(plan=>{
return (plan as ParallelExecutionPlanDefault).getPointer(this.tp);
});
});
}

// Create a placeholder Promise immediately and store it in the map
const placeholderPromise: Promise<ParallelExecutionPlan> = new Promise<ParallelExecutionPlan>((resolve, reject) => {
promises.set(
jsonPtr,
(async () => {
try {
step.output = plan.output;
step.forkId = plan.forkId;
step.forkStack = plan.forkStack;
//await all dependencies ...and replace the parallel array with the executed array, since the
//`promises` Map has caused already executed subtrees to be replaces with their cache-normalized
//equivalent
step.parallel = await Promise.all(
step.parallel.map((d) => {
const executed = _execute(d);
return executed
})
);
//if we are initializing the node, or of it had dependencies that changed, then we
//need to run the step
if( plan.op === "initialize" || step.parallel.some((step) => step.didUpdate || step.completed)){
if(!step.completed){ //fast forward past completed steps
try {
step.didUpdate = await this.tp.evaluateNode(step);
}catch(error){
throw error;
}
}
}else { //if we are here then we are not initializing a node, but reacting to some mutation.
//and it is possible that the node itself is originally a non-materialized node, that
//has now become materialized because it is inside/within a subtree set by a mutation
const _plan = plan as ParallelExecutionPlanDefault;
const insideMutation = this.stepIsDescendantOfMutationTarget(step, _plan );
if(insideMutation){
const theMutation = promises.get(_plan.jsonPtr);
if(!theMutation){
throw new Error(`failed to retrieve mutation from cache for ${_plan.jsonPtr}`);
}
const mutationCausedAChange= (await theMutation).didUpdate;
if(mutationCausedAChange){
const _didUpdate= await this.tp.evaluateNode(step);
step.didUpdate = _didUpdate;
}
}
let resolve:any, reject:any;
const placeHolderPromise:Promise<ParallelExecutionPlan> = new Promise((res, rej) => {
resolve = res;
reject = rej;
});

const executor = async () => {
try {
step.output = plan.output;
step.forkId = plan.forkId;
step.forkStack = plan.forkStack;
//await all dependencies ...and replace the parallel array with the executed array, since the
//`promises` Map has caused already executed subtrees to be replaces with their cache-normalized
//equivalent
step.parallel = await Promise.all(
step.parallel.map((d) => {
const executed = _execute(d);
return executed
})
);
//if we are initializing the node, or of it had dependencies that changed, then we
//need to run the step
if( plan.op === "initialize" || step.parallel.some((step) => step.didUpdate || step.completed)){
if(!step.completed){ //fast forward past completed steps
this.tp.logger.debug(`execute: evaluate ${step.jsonPtr}`);
step.didUpdate = await this.tp.evaluateNode(step);
}
}else { //if we are here then we are not initializing a node, but reacting to some mutation.
//and it is possible that the node itself is originally a non-materialized node, that
//has now become materialized because it is inside/within a subtree set by a mutation
const _plan = plan as ParallelExecutionPlanDefault;
const insideMutation = this.stepIsDescendantOfMutationTarget(step, _plan );
if(insideMutation){
const theMutation = promises.get(_plan.jsonPtr);
if(!theMutation){
throw new Error(`failed to retrieve mutation from cache for ${_plan.jsonPtr}`);
}
const mutationCausedAChange= (await theMutation).didUpdate;
if(mutationCausedAChange){
this.tp.logger.debug(`execute: evaluate ${step.jsonPtr}`);
const _didUpdate= await this.tp.evaluateNode(step);
step.didUpdate = _didUpdate;
}
step.completed = true;
resolve(step);
} catch (error) {
promises.delete(jsonPtr); // Clean up failed promise
reject(error);
}
})().then(() => {//IIFE returns void, so we can't take it as parameter to then
return step as ParallelExecutionPlan; //return the step from the closure
}) // Explicitly return a ParallelExecutionPlan becuase IIFE does not return a known type
);
});
}
step.completed = true;
resolve(step);
} catch (error) {
promises.delete(jsonPtr); // Clean up failed promise
reject(error);
}
};

promises.set(jsonPtr, placeHolderPromise)
executor(); //it is critical that the placeholder be placed in the promises map before we begin executing


return placeholderPromise;
return placeHolderPromise;
}; //end _execute

try { //mutation plan
Expand Down
3 changes: 2 additions & 1 deletion src/TemplateProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ export type PlanStep = {
output:object,
forkStack:Fork[],
forkId:string,
didUpdate:boolean
didUpdate:boolean,
circular?:boolean
}
export type Mutation = {jsonPtr:JsonPointerString, op:Op, data:any};

Expand Down
1 change: 1 addition & 0 deletions src/TraversalState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export class TraversalState {
const e = '🔃 Circular dependency ' + this.stack.map(n => n.jsonPtr).join(' → ') + " → " + jsonPtr;
this.tp.warnings.push(e);
this.tp.logger.log('warn', e);
node.circular = true; //flag this step as circular so that during plan execution we can drop circular steps
return false;
}
if (this.stack[this.stack.length - 1]?.op === "noop") {
Expand Down
3 changes: 3 additions & 0 deletions src/test/TemplateProcessor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5491,6 +5491,7 @@ test("total cost example", async () => {
"op": "initialize",
"parallel": [
{
"circular": true,
"completed": false,
"didUpdate": false,
"forkId": "ROOT",
Expand All @@ -5510,6 +5511,7 @@ test("total cost example", async () => {
"op": "initialize",
"parallel": [
{
"circular": true,
"completed": false,
"didUpdate": false,
"forkId": "ROOT",
Expand All @@ -5529,6 +5531,7 @@ test("total cost example", async () => {
"op": "initialize",
"parallel": [
{
"circular": true,
"completed": false,
"didUpdate": false,
"forkId": "ROOT",
Expand Down
Loading