diff --git a/example/backpressure.yaml b/example/backpressure.yaml new file mode 100644 index 0000000..9a04a0b --- /dev/null +++ b/example/backpressure.yaml @@ -0,0 +1,34 @@ +# producer will be sending some test data +produceParams: + type: "my-topic" + data: ${[1..100].('bleep_' & $string($))} + client: + type: test +# the subscriber's 'to' function will be called on each received event +slowSubscribeParams: #parameters for subscribing to an event + source: cloudEvent + type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events + to: /${slowAntenna} + subscriberId: slowAntenna + parallelism: 4 + client: + type: test +fastSubscribeParams: + source: cloudEvent + type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events + to: /${fastAntenna} + subscriberId: fastAntenna + parallelism: 2 + client: + type: test +slowAntenna: /${ function($bleep){($sleep(100);$set('/rxSlow/-', $bleep))} } +fastAntenna: /${ function($bleep){($set('/rxFast/-', $bleep))} } +# starts producer function +send$: $publish(produceParams) +# starts consumer function +recvSlow$: $subscribe(slowSubscribeParams) +recvFast$: $subscribe(fastSubscribeParams) +# the subscriber's `to` function will write the received data here +rxFast: [ ] +rxSlow: [ ] +done: ${ $count(rxFast) + $count(rxSlow) = 200?'bleeps received':'still bleeping'} \ No newline at end of file diff --git a/src/test/StatedWorkflow.test.js b/src/test/StatedWorkflow.test.js index c9a075e..9da663d 100644 --- a/src/test/StatedWorkflow.test.js +++ b/src/test/StatedWorkflow.test.js @@ -21,6 +21,8 @@ import StatedREPL from "stated-js/dist/src/StatedREPL.js"; import {EnhancedPrintFunc} from "./TestTools.js"; import {rateLimit} from "stated-js/dist/src/utils/rateLimit.js"; import util from "util"; +import {fn} from "jest-mock"; + const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); @@ -773,7 +775,7 @@ test.skip("Template Data Change Callback with rate limit", async () => { expect(counts).toEqual([0,10]); }); - +/* const isMacOS = process.platform === 'darwin'; if (isMacOS) { test("Pulsar consumer integration test", async () => { @@ -868,3 +870,37 @@ if (isMacOS) { }, 300000) } + + */ + +test("backpressure due to max parallelism", async () => { + + // Load the YAML from the file + const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'backpressure.yaml'); + const templateYaml = fs.readFileSync(yamlFilePath, 'utf8'); + var template = yaml.load(templateYaml); + const {templateProcessor:tp} = await StatedWorkflow.newWorkflow(template); + + tp.setDataChangeCallback("/done", (d)=>{ + if(d==="bleeps received"){ + function testActivityRecord(r, expectedMaxActive){ + const {active, queue, backpressure} = r; + expect(Math.max(...active)).toBe(expectedMaxActive); + expect(Math.max(...queue)).toBe(0); //queue acts like a transfer queue and will not grow + expect(backpressure.every(v=>v===true)).toBe(true); + } + testActivityRecord(tp.output.slowSubscribeParams.activityRecord.slowAntenna, 4); + testActivityRecord(tp.output.fastSubscribeParams.activityRecord.fastAntenna, 2); + + expect(tp.output.rxSlow.length).toBe(100); + expect(tp.output.rxFast.length).toBe(100); + done(); + } + }); + await tp.initialize(); + + + +}); + + diff --git a/src/test/TestTools.js b/src/test/TestTools.js index 9793a82..4528819 100644 --- a/src/test/TestTools.js +++ b/src/test/TestTools.js @@ -118,3 +118,9 @@ function sortLogs(output, workflowName) { return aOrder - bOrder; }); } + +export class Delay{ + static async start(delayMs){ + await new Promise(resolve => setTimeout(resolve, delayMs)) + } +} diff --git a/src/workflow/StatedWorkflow.js b/src/workflow/StatedWorkflow.js index ce91642..fe2315d 100644 --- a/src/workflow/StatedWorkflow.js +++ b/src/workflow/StatedWorkflow.js @@ -28,6 +28,7 @@ import jp from "stated-js/dist/src/JsonPointer.js"; import util from "util"; import fs from "fs"; import path from "path"; +import {Delay} from "../test/TestTools.js" const writeFile = util.promisify(fs.writeFile); const basePath = path.join(process.cwd(), '.state'); @@ -72,7 +73,8 @@ export class StatedWorkflow { "publish": this.publish.bind(this), "logFunctionInvocation": this.logFunctionInvocation.bind(this), "workflow": this.workflow.bind(this), - "recover": this.recover.bind(this) + "recover": this.recover.bind(this), + "sleep": Delay.start } }; this.templateProcessor = new TemplateProcessor(template, context); @@ -173,7 +175,7 @@ export class StatedWorkflow { }); } - publish(params) { + async publish(params) { this.logger.debug(`publish params ${StatedREPL.stringify(params)} }`); const {data, type, client:clientParams={}} = params; @@ -185,7 +187,7 @@ export class StatedWorkflow { if (clientParams && clientParams.type === 'test') { this.logger.debug(`test client provided, will not publish to 'real' message broker for publish parameters ${StatedREPL.stringify(params)}`); - this.workflowDispatcher.addBatchToAllSubscribers(type, data); + await this.workflowDispatcher.addBatchToAllSubscribers(type, data); return "done"; } @@ -360,8 +362,8 @@ export class StatedWorkflow { }); - - this.workflowDispatcher.dispatchToAllSubscribers(type, obj); + //if the dispatchers max parallelism is reached this loop should block, which is why we await + await this.workflowDispatcher.dispatchToAllSubscribers(type, obj); if(countdown && --countdown===0){ break; } @@ -468,9 +470,9 @@ export class StatedWorkflow { } onHttp(subscriptionParams) { - StatedWorkflow.app.all('*', (req, res) => { + StatedWorkflow.app.all('*', async (req, res) => { // Push the request and response objects to the dispatch queue to be handled by callback - this.workflowDispatcher.addToQueue({req, res}); + await this.workflowDispatcher.addToQueue({req, res}); }); StatedWorkflow.app.listen(StatedWorkflow.port, () => { @@ -514,7 +516,7 @@ export class StatedWorkflow { currentInput = await this.runStep(workflowInvocation, step, currentInput); } - if (!tp.options.keepLogs) await StatedWorkflow.deleteStepsLogs(workflowInvocation, steps); + //if (!tp.options.keepLogs) await StatedWorkflow.deleteStepsLogs(workflowInvocation, steps); return currentInput; } diff --git a/src/workflow/WorkflowDispatcher.js b/src/workflow/WorkflowDispatcher.js index db94af0..d003ed2 100644 --- a/src/workflow/WorkflowDispatcher.js +++ b/src/workflow/WorkflowDispatcher.js @@ -13,12 +13,14 @@ // limitations under the License. import StatedREPL from "stated-js/dist/src/StatedREPL.js"; import {StatedWorkflow} from "./StatedWorkflow.js"; +import jp from "stated-js/dist/src/JsonPointer.js"; // This class is used to add events to a queue and dispatch them to one or more subscribed workflow function with the // given parallelism. Tracks the number of active events and the number of events in the queue. export class WorkflowDispatcher { constructor(subscribeParams) { const {to: workflowFunction, parallelism, type, subscriberId} = subscribeParams; + this.subscribeParams = subscribeParams; this.workflowFunction = workflowFunction; this.parallelism = parallelism || 1; this.subscriberId = subscriberId; @@ -64,19 +66,19 @@ export class WorkflowDispatcher { if (keysSet) { for (let key of keysSet) { const dispatcher = this.dispatcherObjects.get(key); - await dispatcher.addBatch(testData); // You can pass the actual data you want to dispatch here + dispatcher.addBatch(testData); // You can pass the actual data you want to dispatch here } } else { console.log(`No subscribers found for type ${type}`); } } - dispatchToAllSubscribers(type, data) { + async dispatchToAllSubscribers(type, data) { const keysSet = this.dispatchers.get(type); if (keysSet) { for (let key of keysSet) { const dispatcher = this.dispatcherObjects.get(key); - dispatcher.addToQueue(data); // You can pass the actual data you want to dispatch here + await dispatcher.addToQueue(data); // You can pass the actual data you want to dispatch here } } else { StatedWorkflow.logger.warn(`No subscribers found for type ${type}`); @@ -119,11 +121,48 @@ export class WorkflowDispatcher { } } - addToQueue(data) { - // TODO: - this.queue.push(data); - this._dispatch(); + _logActivity(key, val) { + let record; + const {subscribeParams} = this; + const {maxLog = 10, subscriberId} = subscribeParams; + const path = "/activityRecord/"+subscriberId+"/"+key; + if(!jp.has(subscribeParams, path)){ + record = []; + jp.set(subscribeParams, path, record); + }else{ + record = jp.get(subscribeParams, path); + } + if (record.push(val) > maxLog) { + record.shift(); //we keep a history of the active count for 10 values over time + } } + async addToQueue(data) { + + return new Promise(async (resolve, reject) => { + const tryAddToQueue = async () => { + this._logActivity("active", this.active); + this._logActivity("queue", this.queue.length); + if (this.active < this.parallelism) { + this.queue.push(data); + resolve(); // Resolve the promise to signal that the data was queued + this._dispatch(); // Attempt to dispatch the next task + } else { + // If parallelism limit is reached, wait for any active task to complete + try { + this._logActivity("backpressure", true); + await Promise.race(this.promises); + // Once a task completes, try adding to the queue again + tryAddToQueue(); + } catch (error) { + reject(error); // If waiting for a task to complete results in an error, reject the promise + } + } + }; + + tryAddToQueue(); + }); + } + //this is used for testing async addBatch(testData) { @@ -134,10 +173,12 @@ export class WorkflowDispatcher { this.batchMode = true; if (Array.isArray(testData)) { this.batchCount += testData.length; - testData.forEach(data => this.addToQueue(data)); + for(let i=0;i