Skip to content

Commit

Permalink
backpressure (#16)
Browse files Browse the repository at this point in the history
Co-authored-by: Geoffrey Hendrey <[email protected]>
  • Loading branch information
geoffhendrey and Geoffrey Hendrey authored Feb 18, 2024
1 parent eee2b99 commit c94f007
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 18 deletions.
34 changes: 34 additions & 0 deletions example/backpressure.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# producer will be sending some test data
produceParams:
type: "my-topic"
data: ${[1..100].('bleep_' & $string($))}
client:
type: test
# the subscriber's 'to' function will be called on each received event
slowSubscribeParams: #parameters for subscribing to an event
source: cloudEvent
type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events
to: /${slowAntenna}
subscriberId: slowAntenna
parallelism: 4
client:
type: test
fastSubscribeParams:
source: cloudEvent
type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events
to: /${fastAntenna}
subscriberId: fastAntenna
parallelism: 2
client:
type: test
slowAntenna: /${ function($bleep){($sleep(100);$set('/rxSlow/-', $bleep))} }
fastAntenna: /${ function($bleep){($set('/rxFast/-', $bleep))} }
# starts producer function
send$: $publish(produceParams)
# starts consumer function
recvSlow$: $subscribe(slowSubscribeParams)
recvFast$: $subscribe(fastSubscribeParams)
# the subscriber's `to` function will write the received data here
rxFast: [ ]
rxSlow: [ ]
done: ${ $count(rxFast) + $count(rxSlow) = 200?'bleeps received':'still bleeping'}
38 changes: 37 additions & 1 deletion src/test/StatedWorkflow.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import StatedREPL from "stated-js/dist/src/StatedREPL.js";
import {EnhancedPrintFunc} from "./TestTools.js";
import {rateLimit} from "stated-js/dist/src/utils/rateLimit.js";
import util from "util";
import {fn} from "jest-mock";


const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
Expand Down Expand Up @@ -773,7 +775,7 @@ test.skip("Template Data Change Callback with rate limit", async () => {
expect(counts).toEqual([0,10]);

});

/*
const isMacOS = process.platform === 'darwin';
if (isMacOS) {
test("Pulsar consumer integration test", async () => {
Expand Down Expand Up @@ -868,3 +870,37 @@ if (isMacOS) {
}, 300000)
}
*/

test("backpressure due to max parallelism", async () => {

// Load the YAML from the file
const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'backpressure.yaml');
const templateYaml = fs.readFileSync(yamlFilePath, 'utf8');
var template = yaml.load(templateYaml);
const {templateProcessor:tp} = await StatedWorkflow.newWorkflow(template);

tp.setDataChangeCallback("/done", (d)=>{
if(d==="bleeps received"){
function testActivityRecord(r, expectedMaxActive){
const {active, queue, backpressure} = r;
expect(Math.max(...active)).toBe(expectedMaxActive);
expect(Math.max(...queue)).toBe(0); //queue acts like a transfer queue and will not grow
expect(backpressure.every(v=>v===true)).toBe(true);
}
testActivityRecord(tp.output.slowSubscribeParams.activityRecord.slowAntenna, 4);
testActivityRecord(tp.output.fastSubscribeParams.activityRecord.fastAntenna, 2);

expect(tp.output.rxSlow.length).toBe(100);
expect(tp.output.rxFast.length).toBe(100);
done();
}
});
await tp.initialize();



});


6 changes: 6 additions & 0 deletions src/test/TestTools.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,9 @@ function sortLogs(output, workflowName) {
return aOrder - bOrder;
});
}

export class Delay{
static async start(delayMs){
await new Promise(resolve => setTimeout(resolve, delayMs))
}
}
18 changes: 10 additions & 8 deletions src/workflow/StatedWorkflow.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import jp from "stated-js/dist/src/JsonPointer.js";
import util from "util";
import fs from "fs";
import path from "path";
import {Delay} from "../test/TestTools.js"

const writeFile = util.promisify(fs.writeFile);
const basePath = path.join(process.cwd(), '.state');
Expand Down Expand Up @@ -72,7 +73,8 @@ export class StatedWorkflow {
"publish": this.publish.bind(this),
"logFunctionInvocation": this.logFunctionInvocation.bind(this),
"workflow": this.workflow.bind(this),
"recover": this.recover.bind(this)
"recover": this.recover.bind(this),
"sleep": Delay.start
}
};
this.templateProcessor = new TemplateProcessor(template, context);
Expand Down Expand Up @@ -173,7 +175,7 @@ export class StatedWorkflow {
});
}

publish(params) {
async publish(params) {
this.logger.debug(`publish params ${StatedREPL.stringify(params)} }`);

const {data, type, client:clientParams={}} = params;
Expand All @@ -185,7 +187,7 @@ export class StatedWorkflow {

if (clientParams && clientParams.type === 'test') {
this.logger.debug(`test client provided, will not publish to 'real' message broker for publish parameters ${StatedREPL.stringify(params)}`);
this.workflowDispatcher.addBatchToAllSubscribers(type, data);
await this.workflowDispatcher.addBatchToAllSubscribers(type, data);
return "done";
}

Expand Down Expand Up @@ -360,8 +362,8 @@ export class StatedWorkflow {
});



this.workflowDispatcher.dispatchToAllSubscribers(type, obj);
//if the dispatchers max parallelism is reached this loop should block, which is why we await
await this.workflowDispatcher.dispatchToAllSubscribers(type, obj);
if(countdown && --countdown===0){
break;
}
Expand Down Expand Up @@ -468,9 +470,9 @@ export class StatedWorkflow {
}

onHttp(subscriptionParams) {
StatedWorkflow.app.all('*', (req, res) => {
StatedWorkflow.app.all('*', async (req, res) => {
// Push the request and response objects to the dispatch queue to be handled by callback
this.workflowDispatcher.addToQueue({req, res});
await this.workflowDispatcher.addToQueue({req, res});
});

StatedWorkflow.app.listen(StatedWorkflow.port, () => {
Expand Down Expand Up @@ -514,7 +516,7 @@ export class StatedWorkflow {
currentInput = await this.runStep(workflowInvocation, step, currentInput);
}

if (!tp.options.keepLogs) await StatedWorkflow.deleteStepsLogs(workflowInvocation, steps);
//if (!tp.options.keepLogs) await StatedWorkflow.deleteStepsLogs(workflowInvocation, steps);

return currentInput;
}
Expand Down
59 changes: 50 additions & 9 deletions src/workflow/WorkflowDispatcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
// limitations under the License.
import StatedREPL from "stated-js/dist/src/StatedREPL.js";
import {StatedWorkflow} from "./StatedWorkflow.js";
import jp from "stated-js/dist/src/JsonPointer.js";

// This class is used to add events to a queue and dispatch them to one or more subscribed workflow function with the
// given parallelism. Tracks the number of active events and the number of events in the queue.
export class WorkflowDispatcher {
constructor(subscribeParams) {
const {to: workflowFunction, parallelism, type, subscriberId} = subscribeParams;
this.subscribeParams = subscribeParams;
this.workflowFunction = workflowFunction;
this.parallelism = parallelism || 1;
this.subscriberId = subscriberId;
Expand Down Expand Up @@ -64,19 +66,19 @@ export class WorkflowDispatcher {
if (keysSet) {
for (let key of keysSet) {
const dispatcher = this.dispatcherObjects.get(key);
await dispatcher.addBatch(testData); // You can pass the actual data you want to dispatch here
dispatcher.addBatch(testData); // You can pass the actual data you want to dispatch here
}
} else {
console.log(`No subscribers found for type ${type}`);
}
}

dispatchToAllSubscribers(type, data) {
async dispatchToAllSubscribers(type, data) {
const keysSet = this.dispatchers.get(type);
if (keysSet) {
for (let key of keysSet) {
const dispatcher = this.dispatcherObjects.get(key);
dispatcher.addToQueue(data); // You can pass the actual data you want to dispatch here
await dispatcher.addToQueue(data); // You can pass the actual data you want to dispatch here
}
} else {
StatedWorkflow.logger.warn(`No subscribers found for type ${type}`);
Expand Down Expand Up @@ -119,11 +121,48 @@ export class WorkflowDispatcher {
}
}

addToQueue(data) {
// TODO:
this.queue.push(data);
this._dispatch();
_logActivity(key, val) {
let record;
const {subscribeParams} = this;
const {maxLog = 10, subscriberId} = subscribeParams;
const path = "/activityRecord/"+subscriberId+"/"+key;
if(!jp.has(subscribeParams, path)){
record = [];
jp.set(subscribeParams, path, record);
}else{
record = jp.get(subscribeParams, path);
}
if (record.push(val) > maxLog) {
record.shift(); //we keep a history of the active count for 10 values over time
}
}
async addToQueue(data) {

return new Promise(async (resolve, reject) => {
const tryAddToQueue = async () => {
this._logActivity("active", this.active);
this._logActivity("queue", this.queue.length);
if (this.active < this.parallelism) {
this.queue.push(data);
resolve(); // Resolve the promise to signal that the data was queued
this._dispatch(); // Attempt to dispatch the next task
} else {
// If parallelism limit is reached, wait for any active task to complete
try {
this._logActivity("backpressure", true);
await Promise.race(this.promises);
// Once a task completes, try adding to the queue again
tryAddToQueue();
} catch (error) {
reject(error); // If waiting for a task to complete results in an error, reject the promise
}
}
};

tryAddToQueue();
});
}


//this is used for testing
async addBatch(testData) {
Expand All @@ -134,10 +173,12 @@ export class WorkflowDispatcher {
this.batchMode = true;
if (Array.isArray(testData)) {
this.batchCount += testData.length;
testData.forEach(data => this.addToQueue(data));
for(let i=0;i<testData.length;i++){
await this.addToQueue(testData[i]);
}
} else {
this.batchCount += 1;
this.addToQueue(testData);
await this.addToQueue(testData);
}

}
Expand Down

0 comments on commit c94f007

Please sign in to comment.