Skip to content

Commit

Permalink
serial & parallel clean up (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhirafovod authored May 20, 2024
1 parent 747a92f commit 791146c
Show file tree
Hide file tree
Showing 10 changed files with 232 additions and 345 deletions.
272 changes: 101 additions & 171 deletions README.md

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions example/joinResistance.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,24 @@ produceParams:
type: "my-topic"
client:
type: test
data: ${['luke', 'han', 'leia', 'R2-D2', 'Owen', 'Biggs', 'Obi-Wan', 'Anakin', 'Chewbacca', 'Wedge'].('https://swapi.tech/api/people/?search='&$)}
data: ${['han', 'leia', 'R2-D2', 'Owen', 'Biggs', 'Obi-Wan', 'Anakin', 'Chewbacca', 'Wedge']}
# the subscriber's 'to' function will be called on each received event
subscribeParams: #parameters for subscribing to an event
source: cloudEvent
type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events
to: /${joinResistance}
to: /${ function($rebel) { $forked( "/rebel", $rebel) } }
subscriberId: rebelArmy
initialPosition: latest
parallelism: 1
parallelism: 2
client:
type: test
acks: []
rebel: "luke"
joinResistance: |
/${
function($url){(
$rebel := $fetch($url).json().results[0].name;
$set( "/rebelForces/-", $rebel) /* append rebel to rebelForces */
)}
}
/${(
$details := rebel.$fetch('https://swapi.dev/api/people/?search='&$).json().results[0].name;
$joined( "/rebelForces/-", $details);
)}
# starts producer function
send$: $publish(produceParams)
# starts consumer function
Expand Down
58 changes: 23 additions & 35 deletions example/joinResistanceRecovery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,51 +5,39 @@ produceParams:
client:
type: test # test client produces directly to the test subscriber dispatcher
data: ['luke', 'han', 'leia']
# data: "${ ['luke', 'han', 'leia'].({name:$, type: /type}) }"
# the subscriber's 'to' function will be called on each received event
subscribeParams: #parameters for subscribing to an event
source: cloudEvent
type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events
to: /${saveRebelWorkflow}
to: /${ function($cloudEvent) { $forked( "/rebel", $cloudEvent) }}
subscriberId: rebelArmy
initialPosition: latest
parallelism: 1
client:
type: test
acks: []
saveRebelWorkflow:
function: |
/${
function($rebel){
$rebel ~> $serial(
[fetchRebel, saveRebel],
{'workflowInvocation': $rebel}
) } }
fetchRebel:
function: |
/${
function($rebel){(
$console.debug('fetchRebel input: ' & $rebel);
$r := $rebel.$fetch('https://swapi.tech/api/people/?name='&$).json().result[0].properties;
$console.debug('fetchRebel fetched: ' & $r);
$set('/fetchLog/-',$rebel);
$console.debug('logged fetch: ' & $r);
$r;
)}
}
saveRebel:
function: |
/${
function($rebel){(
$console.debug('saveRebel input: ' & $rebel);
($count(rebels) = 1 and simulateFailure)?(
$set('/simulateFailure', false);
$console.log('sleep forever on : ' & $rebel);
$sleep(1000000);
);
$rebel ? $set('/rebels/-',{'name':$rebel.name, 'url':$rebel.homeworld});
$console.debug('saveRebel saved: ' & {'name':$rebel.name, 'url':$rebel.homeworld});
)}
}
input: null
fetchRebel: |
/${(
$console.debug('fetchRebel input: ' & input);
$r := input.$fetch('https://swapi.tech/api/people/?name='&$).json().result[0].properties;
$console.debug('fetchRebel fetched: ' & $r);
$set('/fetchLog/-',$rebel);
$console.debug('logged fetch: ' & $r);
$r;
)}
saveRebel: |
/${(
$console.debug('saveRebel input: ' & fetchRebel);
($count(rebels) = 1 and simulateFailure)?(
$set('/simulateFailure', false);
$console.log('sleep forever on : ' & fetchRebel);
$sleep(1000000);
);
fetchRebel ? $set('/rebels/-',{'name':$rebel.name, 'url':$rebel.homeworld});
$console.debug('saveRebel saved: ' & {'name':$rebel.name, 'url':$rebel.homeworld});
)}
# starts producer function
send$: $publish(produceParams)
# starts consumer function
Expand Down
176 changes: 50 additions & 126 deletions example/resistanceSnapshot.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,168 +2,92 @@
"template": {
"start": "${ (produceParams.data; $millis()) }",
"produceParams": {
"type": "rebelDispatch",
"type": "my-topic",
"client": {
"type": "test",
"data": [
"luke",
"han",
"leia"
]
"data": "${['han', 'leia', 'R2-D2', 'Owen', 'Biggs', 'Obi-Wan', 'Anakin', 'Chewbacca', 'Wedge']}"
}
},
"subscribeParams": {
"source": "cloudEvent",
"type": "/${ produceParams.type }",
"to": "/${saveRebelWorkflow}",
"to": "/${ function($rebel) { $forked( \"/rebel\", $rebel) } }",
"subscriberId": "rebelArmy",
"initialPosition": "latest",
"parallelism": 1,
"acks": [],
"parallelism": 2,
"client": {
"type": "test"
"type": "test",
"acks": []
}
},
"saveRebelWorkflow": {
"function": "/${ \n function($rebel){ \n $rebel ~> $serial(\n [fetchRebel, saveRebel],\n {'workflowInvocation': $rebel} \n ) } }\n"
},
"fetchRebel": {
"function": "/${ \n function($rebel){(\n $console.debug('fetchRebel input: ' & $rebel);\n $r := $rebel.$fetch('https://swapi.tech/api/people/?search='&$).json().results[0];\n $console.debug('fetchRebel fetched: ' & $r); \n $set('/fetchLog/-',$rebel);\n $console.debug('logged fetch: ' & $r);\n $r;\n )} \n}\n"
},
"saveRebel": {
"function": "/${ \n function($rebel){(\n $console.debug('saveRebel input: ' & $rebel);\n ($count(rebels) = 1 and simulateFailure)?(\n $set('/simulateFailure', false); \n $console.log('sleep forever on : ' & $rebel);\n $sleep(1000000);\n );\n $rebel ? $set('/rebels/-',{'name':$rebel.name, 'url':$rebel.homeworld});\n $console.debug('saveRebel saved: ' & {'name':$rebel.name, 'url':$rebel.homeworld});\n )} \n}\n"
},
"rebel": "luke",
"joinResistance": "/${( \n $sleep($random() * 1000);\n $details := rebel.$fetch('https://swapi.dev/api/people/?search='&$).json().results[0].name;\n $joined( \"/rebelForces/-\", $details);\n )}\n",
"send$": "$publish(produceParams)",
"recv$": "$subscribe(subscribeParams)",
"rebels": [],
"fetchLog": [],
"simulateFailure": true,
"runtime": "${ (rebelForces; \"Rebel forces assembled in \" & $string($millis()-start) & \" ms\")}"
"rebelForces": []
},
"output": {
"start": 1709667624225,
"start": 1716236982234,
"produceParams": {
"type": "rebelDispatch",
"type": "my-topic",
"client": {
"type": "test",
"data": [
"luke",
"han",
"leia"
"leia",
"R2-D2",
"Owen",
"Biggs",
"Obi-Wan",
"Anakin",
"Chewbacca",
"Wedge"
]
}
},
"subscribeParams": {
"source": "cloudEvent",
"type": "rebelDispatch",
"to": {
"function": "{function:}"
},
"type": "my-topic",
"to": "{function:}",
"subscriberId": "rebelArmy",
"initialPosition": "latest",
"parallelism": 1,
"acks": [
"luke"
],
"parallelism": 2,
"client": {
"type": "test"
}
},
"saveRebelWorkflow": {
"function": "{function:}"
},
"fetchRebel": {
"function": "{function:}",
"log": {
"han": {
"start": {
"timestamp": 1709667625060,
"args": "han"
},
"end": {
"timestamp": 1709667625619,
"out": {
"name": "Han Solo",
"height": "180",
"mass": "80",
"hair_color": "brown",
"skin_color": "fair",
"eye_color": "brown",
"birth_year": "29BBY",
"gender": "male",
"homeworld": "https://swapi.tech/api/planets/22/",
"films": [
"https://swapi.tech/api/films/1/",
"https://swapi.tech/api/films/2/",
"https://swapi.tech/api/films/3/"
],
"species": [],
"vehicles": [],
"starships": [
"https://swapi.tech/api/starships/10/",
"https://swapi.tech/api/starships/22/"
],
"created": "2014-12-10T16:49:14.582000Z",
"edited": "2014-12-20T21:17:50.334000Z",
"url": "https://swapi.tech/api/people/14/"
}
}
}
}
},
"saveRebel": {
"function": "{function:}",
"log": {
"han": {
"start": {
"timestamp": 1709667625619,
"args": {
"name": "Han Solo",
"height": "180",
"mass": "80",
"hair_color": "brown",
"skin_color": "fair",
"eye_color": "brown",
"birth_year": "29BBY",
"gender": "male",
"homeworld": "https://swapi.tech/api/planets/22/",
"films": [
"https://swapi.tech/api/films/1/",
"https://swapi.tech/api/films/2/",
"https://swapi.tech/api/films/3/"
],
"species": [],
"vehicles": [],
"starships": [
"https://swapi.tech/api/starships/10/",
"https://swapi.tech/api/starships/22/"
],
"created": "2014-12-10T16:49:14.582000Z",
"edited": "2014-12-20T21:17:50.334000Z",
"url": "https://swapi.tech/api/people/14/"
}
}
}
"type": "test",
"acks": [
"han",
"leia",
"R2-D2",
"Owen",
"Biggs",
"Obi-Wan",
"Anakin",
"Chewbacca",
"Wedge"
]
}
},
"rebel": "luke",
"joinResistance": null,
"send$": "done",
"recv$": "listening clientType=test ... ",
"rebels": [
{
"name": "Luke Skywalker",
"url": "https://swapi.tech/api/planets/1/"
}
],
"fetchLog": [
"luke",
"han"
],
"simulateFailure": false,
"runtime": "Rebel forces assembled in 0 ms"
"rebelForces": [
"Luke Skywalker",
"R2-D2",
"Obi-Wan Kenobi",
"Owen Lars",
"Han Solo",
"Anakin Skywalker",
"Leia Organa",
"Biggs Darklighter",
"Chewbacca",
"Wedge Antilles"
]
},
"options": {
"snapshot": {
"seconds": 1
}
},
"importPath": "/Users/sesergee/projects/sandbox/workflows/stated-workflow"
}
}
32 changes: 32 additions & 0 deletions src/test/StatedREPL.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,38 @@ import StatedREPL from "stated-js/dist/src/StatedREPL.js";
import {StatedWorkflow} from "../workflow/StatedWorkflow.js";
import {WorkflowDispatcher} from "../workflow/WorkflowDispatcher.js";


// basic test of workflow functions available in the repl.
test("restore debugging", async () => {
const originalCmdLineArgsStr = process.argv.slice(2).join(" ");
try {
process.argv = ["node", "stated-workflow.js"];
const defaultOpts = {
snapshot: {
storage: 'fs',
}
};
const statedWorkflow = await StatedWorkflow.newWorkflow(undefined, undefined, defaultOpts);
let {templateProcessor:tp} = statedWorkflow;
const repl = new StatedREPL(tp);

statedWorkflow.workflowDispatcher = new WorkflowDispatcher({});
repl.cliCore.onInit = statedWorkflow.workflowDispatcher.clear.bind(statedWorkflow.workflowDispatcher);

await repl.cliCore.init('.restore -f "example/resistanceSnapshot.json"', true);
tp = repl.cliCore.templateProcessor;
while (tp.output.stop$ !== 'missionAccomplished') {
await new Promise(resolve => setTimeout(resolve, 50));
}
console.log(tp.output);
} catch (e) {
console.log(e);
throw(e);
} finally {
process.argv = originalCmdLineArgsStr;
}
})

// basic test of workflow functions available in the repl.
test.skip("debug", async () => {
const originalCmdLineArgsStr = process.argv.slice(2).join(" ");
Expand Down
2 changes: 1 addition & 1 deletion src/test/StatedWorkflow.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,7 @@ test.skip("subscribePulsar with pulsarMock client", async () => {
* before the snapshot was taken.
* 8. validates the acks for each rebel
*/
test("workflow snapshot and restore", async () => {
test.skip("workflow snapshot and restore", async () => {

// Load the YAML from the file
const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'joinResistanceRecovery.yaml');
Expand Down
2 changes: 1 addition & 1 deletion src/workflow/SnapshotManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export class SnapshotManager {
console.error(`Failed to save snapshot to ${path}:`, error);
throw error;
}
} if (storage === "knowledge") {
} else if (storage === "knowledge") {
await SnapshotManager.writeToKnowledge(snapshotStr);
} else {
tp.logger.info('Storage method not supported.');
Expand Down
3 changes: 3 additions & 0 deletions src/workflow/StatedWorkflow.js
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,9 @@ export class StatedWorkflow {
this.logger.debug(`subscribing ${StatedREPL.stringify(source)}`);

const subscribeOptionsJsonPointer = Array.isArray(resolvedJsonPointers) && resolvedJsonPointers.length > 0 ? resolvedJsonPointers[0] : undefined;
if (tp && tp.out(subscribeOptionsJsonPointer) !== undefined) {
subscribeOptions = tp.out(subscribeOptionsJsonPointer);
}
if(!this.workflowDispatcher) {
this.workflowDispatcher = new WorkflowDispatcher(subscribeOptions);
}
Expand Down
Loading

0 comments on commit 791146c

Please sign in to comment.