Skip to content

Commit

Permalink
snapshots (#17)
Browse files Browse the repository at this point in the history
* wip. Try '.init -f example/backpressure-wf.yaml --options={snapshot:{}}'

* README++ and bug fixes

* lockfiles

* README++

* README++

---------

Co-authored-by: Geoffrey Hendrey <[email protected]>
  • Loading branch information
geoffhendrey and Geoffrey Hendrey authored Feb 23, 2024
1 parent c94f007 commit dc982e1
Show file tree
Hide file tree
Showing 15 changed files with 571 additions and 1,189 deletions.
1,317 changes: 228 additions & 1,089 deletions README.md

Large diffs are not rendered by default.

6 changes: 2 additions & 4 deletions README.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ import {parseMarkdownAndTestCodeblocks} from "stated-js/dist/src/TestUtils.js";
import {StatedWorkflow} from "./src/workflow/StatedWorkflow.js";


const workflow = await StatedWorkflow.newWorkflow();
await workflow.initialize();

let {templateProcessor:tp} = workflow;
const {templateProcessor:tp} = await StatedWorkflow.newWorkflow();
await tp.initialize();
const cliCore = new CliCore(tp);

parseMarkdownAndTestCodeblocks('./README.md', cliCore);
4 changes: 2 additions & 2 deletions example/backpressure.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# producer will be sending some test data
produceParams:
type: "my-topic"
data: ${[1..100].('bleep_' & $string($))}
data: ${[1..10].('bleep_' & $string($))}
client:
type: test
# the subscriber's 'to' function will be called on each received event
Expand Down Expand Up @@ -31,4 +31,4 @@ recvFast$: $subscribe(fastSubscribeParams)
# the subscriber's `to` function will write the received data here
rxFast: [ ]
rxSlow: [ ]
done: ${ $count(rxFast) + $count(rxSlow) = 200?'bleeps received':'still bleeping'}
done: ${ $count(rxFast) + $count(rxSlow) = 20?'bleeps received':'still bleeping'}
25 changes: 25 additions & 0 deletions example/inhabitants.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# producer will be sending some test data
produceParams:
type: "my-topic"
data: ${[1..6].($fetch('https://swapi.dev/api/planets/?page=' & $string($)).json().results)}
client:
type: test
subscribeResidents:
source: cloudEvent
type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events
to: /${ getResidentsWorkflow }
subscriberId: subscribeResidents
parallelism: 4
client:
type: test
getResidentsWorkflow:
function: /${ function($planetInfo){ $planetInfo ~> $serial([extractResidents, fetchResidents]) } }
extractResidents:
function: /${ function($planet){$planet.residents.($fetch($).json())} }
fetchResidents:
function: /${ function($resident){$resident?$set('/residents/-',{'name':$resident.name, 'url':$resident.url})} }
residents: [ ]
# starts producer function
send$: $publish(produceParams)
recv$: $subscribe(subscribeResidents)

31 changes: 31 additions & 0 deletions example/joinResistance.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
start: ${ $millis() }
# producer will be sending some test data
produceParams:
type: "my-topic"
data: ${['luke', 'han', 'leia', 'R2-D2', 'Owen', 'Biggs', 'Obi-Wan', 'Anakin', 'Chewbacca', 'Wedge'].('https://swapi.dev/api/people/?search='&$)}
client:
type: test
# the subscriber's 'to' function will be called on each received event
subscribeParams: #parameters for subscribing to an event
source: cloudEvent
type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events
to: /${joinResistance}
subscriberId: rebelArmy
initialPosition: latest
parallelism: 1
client:
type: test
joinResistance: |
/${
function($url){(
$rebel := $fetch($url).json().results[0].name;
$set( "/rebelForces/-", $rebel) /* append rebel to rebelForces */
)}
}
# starts producer function
send$: $publish(produceParams)
# starts consumer function
recv$: $subscribe(subscribeParams)
# the subscriber's `to` function will write the received data here
rebelForces: [ ]
runtime: ${ (rebelForces; "Rebel forces assembled in " & $string($millis()-start) & " ms")}
31 changes: 31 additions & 0 deletions example/joinResistanceBug.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
start: ${ $millis() }
# producer will be sending some test data
produceParams:
type: "my-topic"
data: ${['luke', 'han', 'leia', 'R2-D2', 'Owen', 'Biggs', 'Obi-Wan', 'Anakin', 'Chewbacca', 'Wedge'].('https://swapi.dev/api/people/?search='&$)}
client:
type: test
# the subscriber's 'to' function will be called on each received event
subscribeParams: #parameters for subscribing to an event
source: cloudEvent
type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events
to: /${joinResistance}
subscriberId: rebelArmy
initialPosition: latest
parallelism: 5
client:
type: test
joinResistance: |
/${
function($url){(
$rebel := $fetch($url).json().results[0].name;
$set( "/rebelForces", $rebelForces~>$append($rebel)) /* BUG!! */
)}
}
# starts producer function
send$: $publish(produceParams)
# starts consumer function
recv$: $subscribe(subscribeParams)
# the subscriber's `to` function will write the received data here
rebelForces: [ ]
runtime: ${ (rebelForces; "Rebel forces assembled in " & $string($millis()-start) & " ms")}
31 changes: 31 additions & 0 deletions example/joinResistanceFast.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
start: ${ $millis() }
# producer will be sending some test data
produceParams:
type: "my-topic"
data: ${['luke', 'han', 'leia', 'R2-D2', 'Owen', 'Biggs', 'Obi-Wan', 'Anakin', 'Chewbacca', 'Wedge'].('https://swapi.dev/api/people/?search='&$)}
client:
type: test
# the subscriber's 'to' function will be called on each received event
subscribeParams: #parameters for subscribing to an event
source: cloudEvent
type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events
to: /${joinResistance}
subscriberId: rebelArmy
initialPosition: latest
parallelism: 5
client:
type: test
joinResistance: |
/${
function($url){(
$rebel := $fetch($url).json().results[0].name;
$set( "/rebelForces/-", $rebel) /* append rebel to rebelForces */
)}
}
# starts producer function
send$: $publish(produceParams)
# starts consumer function
recv$: $subscribe(subscribeParams)
# the subscriber's `to` function will write the received data here
rebelForces: [ ]
runtime: ${ (rebelForces; "Rebel forces assembled in " & $string($millis()-start) & " ms")}
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"express": "^4.18.2",
"kafkajs": "^2.2.4",
"pulsar-client": "^1.9.0",
"stated-js": "^0.0.106"
"stated-js": "^0.1.4"
},
"devDependencies": {
"jest": "^29.7.0"
Expand Down
16 changes: 8 additions & 8 deletions src/test/StatedWorkflow.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,8 @@ test("backpressure due to max parallelism", async () => {
const templateYaml = fs.readFileSync(yamlFilePath, 'utf8');
var template = yaml.load(templateYaml);
const {templateProcessor:tp} = await StatedWorkflow.newWorkflow(template);

let latch;
new Promise((resolve)=>{latch=resolve});
tp.setDataChangeCallback("/done", (d)=>{
if(d==="bleeps received"){
function testActivityRecord(r, expectedMaxActive){
Expand All @@ -889,18 +890,17 @@ test("backpressure due to max parallelism", async () => {
expect(Math.max(...queue)).toBe(0); //queue acts like a transfer queue and will not grow
expect(backpressure.every(v=>v===true)).toBe(true);
}
/* activity record is kind of interal thing for debugging, don't need to test it
testActivityRecord(tp.output.slowSubscribeParams.activityRecord.slowAntenna, 4);
testActivityRecord(tp.output.fastSubscribeParams.activityRecord.fastAntenna, 2);

expect(tp.output.rxSlow.length).toBe(100);
expect(tp.output.rxFast.length).toBe(100);
done();
*/
expect(tp.output.rxSlow.length).toBe(10);
expect(tp.output.rxFast.length).toBe(10);
latch();
}
});
await tp.initialize();



await latch;
});


26 changes: 26 additions & 0 deletions src/workflow/Snapshot.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { promises as fs } from 'fs';

export class Snapshot {

static async write(tp) {
const {snapshot: snapshotOpts} = tp.options;
if(!snapshotOpts){
tp.logger.debug("no --snapshot options defined, skipping snapshot");
return;
}
const snapshotStr = tp.snapshot();
const {storage = "fs", path = "./defaultSnapshot.json"} = snapshotOpts; // Default path if not provided

if (storage === "fs") {
try {
await fs.writeFile(path, snapshotStr);
tp.logger.info(`Snapshot saved to ${path}`);
} catch (error) {
console.error(`Failed to save snapshot to ${path}:`, error);
throw error;
}
} else {
tp.logger.info('Storage method not supported.');
}
}
}
Loading

0 comments on commit dc982e1

Please sign in to comment.