diff --git a/example/inhabitants-with-delay.yaml b/example/inhabitants-with-delay.yaml new file mode 100644 index 0000000..7ca9d27 --- /dev/null +++ b/example/inhabitants-with-delay.yaml @@ -0,0 +1,26 @@ +# producer will be sending some test data +produceParams: + type: "residents" + # fetch one page of planet data from the Star Wars API + data: ${[1].($fetch('https://swapi.dev/api/planets/?page=' & $string($)).json().results)} + client: + type: test +subscribeResidents: + source: cloudEvent + type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events + to: /${ getResidentsWorkflow } + subscriberId: subscribeResidents + parallelism: 4 + client: + type: test +getResidentsWorkflow: + function: /${ function($planetInfo){ $planetInfo ~> $serial([extractResidents, fetchResidents]) } } +extractResidents: + function: /${ function($planet){( $sleep($random() * 10) ; $planet.residents.($fetch($).json()) )} } # add a random delay +fetchResidents: + function: /${ function($residents){$residents.($set('/residents/-',{'name':$.name, 'url':$.url}))} } +residents: [ ] +# starts producer function +send$: $publish(produceParams) +recv$: $subscribe(subscribeResidents) + diff --git a/example/pubsub-data-function-pulsar.yaml b/example/pubsub-data-function-pulsar.yaml index a312b7c..f6f9f19 100644 --- a/example/pubsub-data-function-pulsar.yaml +++ b/example/pubsub-data-function-pulsar.yaml @@ -4,7 +4,7 @@ produceParams: data: | ${ function(){ - {'message': 'Rebel Fleet Coordinates', 'location': $random(), 'sender': $floor($random()*10)+1 } + {'message': 'Rebel Fleet Coordinates', 'location': $random(), 'sender': $ceil($random()*10) } } } client: diff --git a/example/rebelCommunication.yaml b/example/rebelCommunication.yaml new file mode 100644 index 0000000..1ae860d --- /dev/null +++ b/example/rebelCommunication.yaml @@ -0,0 +1,51 @@ +# Droid R2-D2 is sending messages to the Rebel Alliance's communication channel +produceParams: + type: "rebel-comm-channel" + data: | + ${ + function(){ + {'message': 'Rebel Fleet Coordinates', 'location': $random(), 'sender': $ceil($random()*10) } + } + } + client: + type: pulsarMock +# Droid C-3PO will intercept and log each received message for the Rebel Alliance +subscribeParams: #parameters for subscribing to a holocomm transmission + source: cloudEvent + type: /${ produceParams.type } # subscribe to the same channel as R2-D2 to intercept messages + to: /${processMessageWorkflow} + subscriberId: protocolDroid + parallelism: 5 + initialPosition: latest + client: + type: pulsarMock +processMessageWorkflow: /${ function($message){ $message ~> $serial([step1, step2, step3]) }} +step1: + name: logMessage + function: /${ function($e){( $set('/interceptedMessages/-', $e); $e)} } +step2: + name: identifySender + function: | + ${ function($e){( + { + 'sender': $e.sender, + 'name': $fetch('https://swapi.dev/api/people/'& $e.sender).json().name, + 'location': $e.location + } + )} } +step3: + name: classifyLocation + function: | + /${ function($e){( + $e.location > 0.5 ? $set('/farFarAway/-', $e) : $set('/nearBy/-', $e); $e + )} } +# Activates R2-D2's message transmission function every 50 milliseconds +send: "${ $setInterval( function(){ $publish(produceParams)}, 50) }" +# Activates C-3PO's message interception function +recv$: $subscribe(subscribeParams) +# interceptedMessages is where C-3PO will store the results of message decoding +interceptedMessages: [ ] +farFarAway: [ ] +nearBy: [ ] +# This condition stops the transmission operation when interceptedMessages has 10 elements +stop$: ($count(interceptedMessages)>=10?($clearInterval(send);'transmissionAccomplished'):'transmissionOngoing') diff --git a/src/test/PulsarMock.js b/src/test/PulsarMock.js new file mode 100644 index 0000000..c26a3cb --- /dev/null +++ b/src/test/PulsarMock.js @@ -0,0 +1,251 @@ +export class PulsarClientMock { + // A map used to simulate an in-memory store for messages. The keys are topic names, and the values are arrays of message objects. + static inMemoryStore = new Map(); + + // A counter to generate unique message IDs. It is incremented each time a message is sent. + static messageIdCounter = 0; + + // A map where keys are topic names and values are arrays of receive function invocations (listeners). These listeners are called to notify about new messages. + static listeners = new Map(); + + // The default time in milliseconds to wait before a message is considered not acknowledged (acknowledged messages are removed from visibility to simulate message acknowledgment behavior). + static ackTimeout = 30000; + + // A nested map where the first key is the topic name, the second key is the subscriber ID, and the value is a set of message IDs that have been acknowledged by the subscriber. + static acknowledgedMessages = new Map(); + + static configureAckTimeout(timeout) { + this.ackTimeout = timeout; + } + + async createProducer(config) { + const topic = config.topic; + if (!PulsarClientMock.inMemoryStore.has(topic)) { + PulsarClientMock.inMemoryStore.set(topic, []); + } + + return { + send: async (message) => { + const messageId = new MessageId(`message-${++PulsarClientMock.messageIdCounter}`); + const messageInstance = new Message(topic, undefined, message.data, messageId, Date.now(), Date.now(), 0, ''); + const messages = PulsarClientMock.inMemoryStore.get(topic) || []; + messages.push({ message: messageInstance, subscriberIds: new Set() }); // Track which subscribers have received the message + + // Ensure all subscribers are aware of the new message + PulsarClientMock.notifyListeners(topic); + + return { messageId: messageId.toString() }; + }, + close: async () => {}, + }; + } + + /** + * creates a consumer per topic and subscriberId. If more than one subscriber with the same subscriberId is created + * for the same topic, they will process messages in a FIFO manner. + */ + + async subscribe(config) { + const topic = config.topic; + const subscriberId = config.subscription; + + + return { + + /** + * receives either returns the next visible message, or blocks until a message is available. + */ + receive: () => { + return new Promise((resolve) => { + const tryResolve = () => { + const messages = PulsarClientMock.inMemoryStore.get(topic) || []; + const messageIndex = messages.findIndex(m => !m.subscriberIds.has(subscriberId)); + if (messageIndex !== -1) { // there a message available for this subscriber + const message = messages[messageIndex]; + message.subscriberIds.add(subscriberId); // Mark as received by this subscriber + + // Make the message acknowledged after a timeout for this subscriber + setTimeout(() => { + if (!PulsarClientMock.isAcknowledged(topic, message.message.messageId.id, subscriberId)) { + // If not acknowledged, make it visible again to all subscribers + message.subscriberIds.delete(subscriberId); + PulsarClientMock.notifyListeners(topic); + } + }, PulsarClientMock.ackTimeout); + + resolve(message.message); + + } else { // no message available for this subscriber, wait for the next one + if (!PulsarClientMock.listeners.has(topic)) { + PulsarClientMock.listeners.set(topic, []); + } + // add this function invocation to the list of listeners for this topic + PulsarClientMock.listeners.get(topic).push(tryResolve); + } + }; + + tryResolve(); + }); + }, + acknowledge: async (message) => {/**/ + PulsarClientMock.acknowledgeMessage(topic, message.messageId.id, subscriberId); + }, + acknowledgeId: async (messageId) => { + PulsarClientMock.acknowledgeMessage(topic, messageId.id, subscriberId); + }, + close: async () => {}, + }; + } + + static isAcknowledged(topic, messageId, subscriberId) { + const topicAcks = this.acknowledgedMessages.get(topic); + const subscriberAcks = topicAcks ? topicAcks.get(subscriberId) : undefined; + return subscriberAcks ? subscriberAcks.has(messageId) : false; + } + + static acknowledgeMessage(topic, messageId, subscriberId) { + if (!this.acknowledgedMessages.has(topic)) { + this.acknowledgedMessages.set(topic, new Map()); + } + + const topicAcks = this.acknowledgedMessages.get(topic); + if (!topicAcks.has(subscriberId)) { + topicAcks.set(subscriberId, new Set()); + } + + const subscriberAcks = topicAcks.get(subscriberId); + subscriberAcks.add(messageId); + } + + static getTopics() { + return Array.from(PulsarClientMock.inMemoryStore.keys()); + } + + static getStats(topic, subscriberId) { + const messages = this.inMemoryStore.get(topic) || []; + const topicAcks = this.acknowledgedMessages.get(topic); + const subscriberAcks = topicAcks ? (topicAcks.get(subscriberId) || new Set()) : new Set(); + + // Calculate inFlight count as messages not yet acknowledged by this subscriber + const inFlight = messages.filter(m => !subscriberAcks.has(m.message.messageId.id)).length; + // Queue length includes messages not yet received or acknowledged by this subscriber + const queueLength = messages.length - subscriberAcks.size; + + return { + acknowledgedCount: subscriberAcks.size, + inFlightCount: inFlight, + queueLength, + }; + } + + static getAcknowledgedMessages(topic, subscriberId) { + const topicAcks = this.acknowledgedMessages.get(topic); + const subscriberAcks = topicAcks ? topicAcks.get(subscriberId) : undefined; + + if (!subscriberAcks) { + return []; + } + + const messages = this.inMemoryStore.get(topic) || []; + // Filter messages that have been acknowledged by the subscriber + return messages + .filter(m => subscriberAcks.has(m.message.messageId.id)) + .map(m => m.message); + } + + /** + * Notify all listeners for a topic about a message available. + * A listener is a receive invocation that is waiting for a message to be available. + * A message can be a new one, or the one with expired visibility timeout + */ + static notifyListeners(topic) { + const listeners = PulsarClientMock.listeners.get(topic) || []; + while (listeners.length > 0) { + const listener = listeners.shift(); + listener(); + } + } + + static clear() { + PulsarClientMock.inMemoryStore.clear(); + PulsarClientMock.listeners.clear(); + PulsarClientMock.acknowledgedMessages.clear(); + } + + close() { + return Promise.resolve(null); + } +} + +export class MessageId { + constructor(id) { + this.id = id; + } + + static earliest() { + return new MessageId("earliest"); + } + + static latest() { + return new MessageId("latest"); + } + + static deserialize(data) { + // Assuming the input data is a Buffer containing a string ID + return new MessageId(data.toString()); + } + + serialize() { + // Convert the ID to a Buffer + return Buffer.from(this.id); + } + + toString() { + return this.id; + } +} + +export class Message { + constructor(topicName, properties, data, messageId, publishTimestamp, eventTimestamp, redeliveryCount, partitionKey) { + this.topicName = topicName; + this.properties = properties; + this.data = data; + this.messageId = messageId; + this.publishTimestamp = publishTimestamp; + this.eventTimestamp = eventTimestamp; + this.redeliveryCount = redeliveryCount; + this.partitionKey = partitionKey; + } + + getTopicName() { + return this.topicName; + } + + getProperties() { + return this.properties; + } + + getData() { + return this.data; + } + + getMessageId() { + return this.messageId; + } + + getPublishTimestamp() { + return this.publishTimestamp; + } + + getEventTimestamp() { + return this.eventTimestamp; + } + + getRedeliveryCount() { + return this.redeliveryCount; + } + + getPartitionKey() { + return this.partitionKey; + } +} \ No newline at end of file diff --git a/src/test/StatedWorkflow.test.js b/src/test/StatedWorkflow.test.js index a7dd914..eedf3c9 100644 --- a/src/test/StatedWorkflow.test.js +++ b/src/test/StatedWorkflow.test.js @@ -16,12 +16,13 @@ import fs from 'fs'; import yaml from 'js-yaml'; import { fileURLToPath } from 'url'; import path from 'path'; -import {WorkflowDispatcher} from "../workflow/WorkflowDispatcher.js"; import StatedREPL from "stated-js/dist/src/StatedREPL.js"; import {EnhancedPrintFunc} from "./TestTools.js"; import {rateLimit} from "stated-js/dist/src/utils/rateLimit.js"; import util from "util"; import {fn} from "jest-mock"; +import {PulsarClientMock} from "./PulsarMock.js"; +import Pulsar from "pulsar-client"; const __filename = fileURLToPath(import.meta.url); @@ -550,7 +551,7 @@ test("recover incomplete workflow - step 1 is incomplete - should rerun steps 1 } } } - } + } step2: name: sprayTheNozzle function: \${function($e){ $e~>|$|{'sprayed':true}| }} @@ -775,9 +776,16 @@ test.skip("Template Data Change Callback with rate limit", async () => { expect(counts).toEqual([0,10]); }); -/* -const isMacOS = process.platform === 'darwin'; -if (isMacOS) { + +/** + * Pulsar Integration Tests + * + * 1. start docker-compose + * docker-compose -f docker/docker-compose.yaml up -d + * 2. run the tests with ENABLE_INTEGRATION_TESTS set to "true" + * ENABLE_INTEGRATION_TESTS=true yarn test StatedWorkflow.test.js + */ +if (process.env.ENABLE_INTEGRATION_TESTS === "true") { test("Pulsar consumer integration test", async () => { const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'pubsub-pulsar.yaml'); const templateYaml = fs.readFileSync(yamlFilePath, 'utf8'); @@ -785,7 +793,7 @@ if (isMacOS) { const {templateProcessor: tp} = await StatedWorkflow.newWorkflow(template); // keep steps execution logs for debugging - tp.options = {'keepLogs': true} + tp.options = {'keepLogs': true, 'snapshot': {'snapshotIntervalSeconds': 0.01}}; await tp.initialize(); @@ -871,8 +879,6 @@ if (isMacOS) { }, 300000) } - */ - test("backpressure due to max parallelism", async () => { // Load the YAML from the file @@ -904,3 +910,121 @@ test("backpressure due to max parallelism", async () => { }); +/** + * This test validates that the workflow can be recovered from a snapshot. + */ +test("Snapshot and recover for workflow", async () => { + // Logging function to console.log with date stamps + const logWithDate = (message) => { + console.log(`${new Date().toISOString()}: ${message}`); + }; + + // Load the YAML from the file + const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'inhabitants-with-delay.yaml'); + const templateYaml = fs.readFileSync(yamlFilePath, 'utf8'); + var template = yaml.load(templateYaml); + const sw = await StatedWorkflow.newWorkflow(template); + const {templateProcessor: tp} = sw; + tp.options = {'snapshot': {'snapshotIntervalSeconds': 0.01}} + + const defaultSnapshotPath = path.join(__dirname, '../', '../','defaultSnapshot.json'); + + // Make sure default snapshot is deleted before the test + try { + await unlink(defaultSnapshotPath); + logWithDate(`Deleted previous default snapshot file: ${defaultSnapshotPath}`); + } catch (e) { + if (e.code !== 'ENOENT') { + throw e; + } + } + + await tp.initialize(); + logWithDate("Initialized stated workflow template..."); + + let anyResidentsSnapshotted = false; + let snapshot; + + // Wait for the snapshot file to include at least 10 residents + while (!anyResidentsSnapshotted) { + try { + const snapshotContent = fs.readFileSync(defaultSnapshotPath, 'utf8'); + snapshot = JSON.parse(snapshotContent); + logWithDate(`Snapshot has ${snapshot.output.residents.length} residents`); + if (snapshot.output?.residents?.length > 5) { + anyResidentsSnapshotted = true; + break; + } + } catch (e) { + logWithDate(`Error checking snapshot residents: ${e.message}`); + } + await new Promise(resolve => setTimeout(resolve, 1000)); // Poll every 1s + } + + // Kill the wf + logWithDate("Stopping stated workflow..."); + await sw.close(); + logWithDate(`Stopped stated workflow, with ${tp.output.residents.length} residents`); + + logWithDate(`Recovering from a snapshot with ${snapshot.output.residents.length} residents`); + await tp.initialize(snapshot.template, '/', snapshot.output); + + // Calculate unique residents + let uniqResidents = 0; + do { + uniqResidents = Object.keys(tp.output?.residents.reduce((counts, o)=>{ counts[o.name] = (counts[o.name] || 0) + 1; return counts }, {})).length; + logWithDate(`Got ${uniqResidents} unique residents processed`); + await new Promise(resolve => setTimeout(resolve, 1000)); + } while (uniqResidents < 32) + + logWithDate(`We got ${uniqResidents} unique residents processed with ${tp.output.residents.length} total residents`); + await sw.close(); + logWithDate("Stopped stated workflow before test end"); +}, 20000); // 20s timeout for times swapi not behaving + + +test("subscribePulsar with pulsarMock client", async () => { + + const defaultSnapshotPath = path.join(__dirname, '../', '../','defaultSnapshot.json'); + try { + await unlink(defaultSnapshotPath); + console.log(`Deleted previous default snapshot file: ${defaultSnapshotPath}`); + } catch (e) { + if (e.code !== 'ENOENT') { + throw e; + } + } + + PulsarClientMock.clear(); + const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'rebelCommunication.yaml'); + const templateYaml = fs.readFileSync(yamlFilePath, 'utf8'); + let template = yaml.load(templateYaml); + + const sw = await StatedWorkflow.newWorkflow(template); + await sw.close(); + const {templateProcessor: tp} = sw; + // keep steps execution logs for debugging + tp.options = {'keepLogs': true, 'snapshot': {}}; + + await tp.initialize(); + + while (tp.output.farFarAway?.length + tp.output.nearBy?.length < 10) { + console.log(`Waiting for at least 10 messages. So far received from farFarAway: ${tp.output.farFarAway?.length}, from nearBy: ${tp.output.nearBy?.length}`); + await new Promise(resolve => setTimeout(resolve, 500)); // Poll every 50ms + } + console.log(`Received 10 or more messages. Received from farFarAway: ${tp.output.farFarAway?.length}, from nearBy: ${tp.output.nearBy?.length}`); + + expect(tp.output.interceptedMessages?.length).toBeGreaterThanOrEqual(10) + expect(tp.output.farFarAway?.length + tp.output.nearBy?.length).toBeGreaterThanOrEqual(10); + + console.log("waiting for at least 10 messages to be acknowledged"); + const topic = PulsarClientMock.getTopics()[0]; // we use only one topic in the test + const subscriberId = tp.output.subscribeParams.type; + + while (!Array.isArray(PulsarClientMock.getAcknowledgedMessages(topic)) + || PulsarClientMock.getAcknowledgedMessages(topic, subscriberId).length < 10) { + console.log(`PulsarMock topic ${topic} stats for subscriberId ${subscriberId}: ${StatedREPL.stringify(PulsarClientMock.getStats(topic, subscriberId))}`); + await new Promise(resolve => setTimeout(resolve, 500)); // Poll every 500ms + }; + console.log(`PulsarMock topic ${topic} stats for subscriberId ${subscriberId}: ${StatedREPL.stringify(PulsarClientMock.getStats(topic, subscriberId))}`); +}, 200000) diff --git a/src/workflow/StatedWorkflow.js b/src/workflow/StatedWorkflow.js index dc180b3..bc8f6a9 100644 --- a/src/workflow/StatedWorkflow.js +++ b/src/workflow/StatedWorkflow.js @@ -19,17 +19,17 @@ import Pulsar from 'pulsar-client'; import {Kafka, logLevel} from 'kafkajs'; import winston from "winston"; import {WorkflowDispatcher} from "./WorkflowDispatcher.js"; -import {StepLog} from "./StepLog.js"; import Step from "./Step.js"; import {createStepPersistence} from "./StepPersistence.js"; import {TemplateUtils} from "./utils/TemplateUtils.js"; import {WorkflowPersistence} from "./WorkflowPersistence.js"; -import jp from "stated-js/dist/src/JsonPointer.js"; import util from "util"; import fs from "fs"; import path from "path"; import {Delay} from "../test/TestTools.js" import {Snapshot} from "./Snapshot.js"; +import {rateLimit} from "stated-js/dist/src/utils/rateLimit.js"; +import {PulsarClientMock} from "../test/PulsarMock.js"; const writeFile = util.promisify(fs.writeFile); const basePath = path.join(process.cwd(), '.state'); @@ -89,7 +89,8 @@ export class StatedWorkflow { this.templateProcessor.initCallbacks = [ // --- clear the dispatcher --- ()=>{this.workflowDispatcher && this.workflowDispatcher.clear()}, - //--- start periodic snapshotting --- + //--- add rateLimited --- + // ()=>{ ()=>{ const {snapshot: snapshotOpts} = this.templateProcessor.options; if(!snapshotOpts){ @@ -182,13 +183,12 @@ export class StatedWorkflow { } } + createPulsarClientMock(params) { + if (this.pulsarClient) return; - ensureClient(params) { - if (!params || params.type == 'pulsar') { - this.createPulsarClient(params); - } else if (params.type == 'kafka') { - this.createKafkaClient(params); - } + this.pulsarClient = new PulsarClientMock({ + serviceUrl: 'pulsar://localhost:6650', + }); } createPulsarClient(params) { @@ -228,6 +228,10 @@ export class StatedWorkflow { this.publishKafka(params, clientParams); } else if(clientType==="pulsar") { this.publishPulsar(params, clientParams); + }else if(clientType === 'pulsarMock'){ + this.logger.debug(`publishing to pulsarMock using ${clientParams}`) + this.createPulsarClientMock(clientParams); + this.publishPulsar(params, clientParams); }else{ throw new Error(`Unsupported clientType: ${clientType}`); } @@ -325,8 +329,6 @@ export class StatedWorkflow { // }); - - if (source === 'http') { return this.onHttp(subscribeOptions); } @@ -358,54 +360,39 @@ export class StatedWorkflow { }); // Store the consumer in the map this.consumers.set(type, consumer); - let data; let countdown = maxConsume; while (true) { try { - data = await consumer.receive(); - let obj; + const message = await consumer.receive(); + let messageData; try { - const str = data.getData().toString(); - obj = JSON.parse(str); + const messageDataStr = message.getData().toString(); + messageData = JSON.parse(messageDataStr); } catch (error) { console.error("unable to parse data to json:", error); + // TODO - should we acknowledge the message here? + continue; } let resolve; this.latch = new Promise((_resolve) => { resolve = _resolve; //we assign our resolve variable that is declared outside this promise so that our onDataChange callbacks can use it }); - this.templateProcessor.setDataChangeCallback('/', async (data, jsonPtrs, removed) => { - for (let jsonPtr of jsonPtrs) { - if (/^\/step\d+\/log\/.*$/.test(jsonPtr)) { - await writeFile(path.join(basePath,'template.json') , StatedREPL.stringify(data), 'utf8'); - } - if (/^\/step1\/log\/.*$/.test(jsonPtr)) { - // TODO: await persist the step - const dataThatChanged = jp.get(data, jsonPtr); - if (dataThatChanged.start !== undefined && dataThatChanged.end === undefined) { - resolve(); - } - } - } - - }); - + // we create a callback to acknowledge the message + const dataAckCallback = async () => { + const promise = consumer.acknowledge(message); + console.log(`acknowledging messageId ${StatedREPL.stringify(message.getMessageId().toString())} for messageData: ${StatedREPL.stringify(messageData)}`); + } //if the dispatchers max parallelism is reached this loop should block, which is why we await - await this.workflowDispatcher.dispatchToAllSubscribers(type, obj); + await this.workflowDispatcher.dispatchToAllSubscribers(type, messageData, dataAckCallback); if(countdown && --countdown===0){ break; } } catch (error) { console.error("Error receiving or dispatching message:", error); } finally { - if (data !== undefined) { - await this.latch; - consumer.acknowledge(data); - } - if (this.pulsarClient === undefined) { break; } @@ -498,6 +485,10 @@ export class StatedWorkflow { this.logger.debug(`subscribing to pulsar (default) using ${clientParams}`) this.createPulsarClient(clientParams); this.subscribePulsar(subscriptionParams); + }else if(clientType === 'pulsarMock'){ + this.logger.debug(`subscribing to pulsarMock using ${clientParams}`) + this.createPulsarClientMock(clientParams); + this.subscribePulsar(subscriptionParams); }else{ throw new Error(`unsupported client.type in ${StatedREPL.stringify(subscriptionParams)}`); } @@ -774,8 +765,8 @@ export class StatedWorkflow { // await consumer.disconnect(); // } try { - await this.workflowDispatcher.clear(); - await this.templateProcessor.close(); + if (this.workflowDispatcher) await this.workflowDispatcher.clear(); + if (this.templateProcessor) await this.templateProcessor.close(); } catch (error) { console.error("Error closing workflow dispatcher:", error); diff --git a/src/workflow/WorkflowDispatcher.js b/src/workflow/WorkflowDispatcher.js index 103d383..7ee78a9 100644 --- a/src/workflow/WorkflowDispatcher.js +++ b/src/workflow/WorkflowDispatcher.js @@ -26,6 +26,7 @@ export class WorkflowDispatcher { this.subscriberId = subscriberId; this.type = type; this.queue = []; + this.dataAckCallbacks = new Map(); this.active = 0; this.promises = []; this.batchMode = false; @@ -73,12 +74,12 @@ export class WorkflowDispatcher { } } - async dispatchToAllSubscribers(type, data) { + async dispatchToAllSubscribers(type, data, dataAckCallback) { const keysSet = this.dispatchers.get(type); if (keysSet) { for (let key of keysSet) { const dispatcher = this.dispatcherObjects.get(key); - await dispatcher.addToQueue(data); // You can pass the actual data you want to dispatch here + await dispatcher.addToQueue(data, dataAckCallback); // You can pass the actual data you want to dispatch here } } else { StatedWorkflow.logger.warn(`No subscribers found for type ${type}`); @@ -112,6 +113,16 @@ export class WorkflowDispatcher { const index = this.promises.indexOf(promise); if (index > -1) { this.promises.splice(index, 1); + if (this.dataAckCallbacks.get(eventData)) { + // console.debug("calling dataAckCallbacks for ", eventData); + const callbackPromise = this.dataAckCallbacks.get(eventData)(); + callbackPromise.then(() => { + // console.debug("dataAckCallbacks resolved for ", eventData); + }).catch(error => { + console.error("Error calling dataAckCallbacks:", error); + }); + delete this.dataAckCallbacks.get(eventData); + } } this._dispatch(); }); @@ -138,14 +149,17 @@ export class WorkflowDispatcher { record.shift(); //we keep a history of the active count for 10 values over time } } - async addToQueue(data) { + async addToQueue(data, dataAckCallback) { return new Promise(async (resolve, reject) => { const tryAddToQueue = async () => { - this._logActivity("active", this.active); - this._logActivity("queue", this.queue.length); + this._logActivity("log", {"t": new Date().getTime(), "acivie": this.active, "queue": this.queue.length}); if (this.active < this.parallelism) { this.queue.push(data); + if (dataAckCallback) { + // console.debug("adding dataAckCallbacks for ", data); + this.dataAckCallbacks.set(data, dataAckCallback); + } resolve(); // Resolve the promise to signal that the data was queued this._dispatch(); // Attempt to dispatch the next task } else { diff --git a/yarn.lock b/yarn.lock index a830643..c517b9d 100644 --- a/yarn.lock +++ b/yarn.lock @@ -23,7 +23,7 @@ resolved "https://registry.npmjs.org/@babel/compat-data/-/compat-data-7.23.5.tgz" integrity sha512-uU27kfDRlhfKl+w1U6vp16IuvSLtjAxdArVXPa9BvLkrr7CYIsxH5adpHObeAGY/41+syctUWOZ140a2Rvkgjw== -"@babel/core@^7.0.0", "@babel/core@^7.0.0-0", "@babel/core@^7.11.6", "@babel/core@^7.12.3", "@babel/core@^7.8.0": +"@babel/core@^7.11.6", "@babel/core@^7.12.3": version "7.23.7" resolved "https://registry.npmjs.org/@babel/core/-/core-7.23.7.tgz" integrity sha512-+UpDgowcmqe36d4NwqvKsyPMlOLNGMsfMmQ5WGCu+siCe3t3dfe9njrzGfdN4qq+bcNUt0+Vw6haRxBOycs4dw== @@ -297,7 +297,7 @@ resolved "https://registry.npmjs.org/@bcoe/v8-coverage/-/v8-coverage-0.2.3.tgz" integrity sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw== -"@colors/colors@^1.6.0", "@colors/colors@1.6.0": +"@colors/colors@1.6.0", "@colors/colors@^1.6.0": version "1.6.0" resolved "https://registry.npmjs.org/@colors/colors/-/colors-1.6.0.tgz" integrity sha512-Ir+AOibqzrIsL6ajt3Rz3LskB7OiMVHqltZmspbW/TJuTVuyOMirVqAkjfY6JISiLHgyNqicAC8AyHHGzNd/dA== @@ -839,13 +839,13 @@ bindings@^1.5.0: dependencies: file-uri-to-path "1.0.0" -body-parser@^1.20.2: - version "1.20.2" - resolved "https://registry.npmjs.org/body-parser/-/body-parser-1.20.2.tgz" - integrity sha512-ml9pReCu3M61kGlqoTm2umSXTlRTuGTx0bfYj+uIUKKYycG5NtSbeetV3faSU6R7ajOPw0g/J1PvK4qNy7s5bA== +body-parser@1.20.1: + version "1.20.1" + resolved "https://registry.npmjs.org/body-parser/-/body-parser-1.20.1.tgz" + integrity sha512-jWi7abTbYwajOytWCQc37VulmWiRae5RyTpaCyDcS5/lMdtwSz5lOpDE67srw/HYe35f1z3fDQw+3txg7gNtWw== dependencies: bytes "3.1.2" - content-type "~1.0.5" + content-type "~1.0.4" debug "2.6.9" depd "2.0.0" destroy "1.2.0" @@ -853,17 +853,17 @@ body-parser@^1.20.2: iconv-lite "0.4.24" on-finished "2.4.1" qs "6.11.0" - raw-body "2.5.2" + raw-body "2.5.1" type-is "~1.6.18" unpipe "1.0.0" -body-parser@1.20.1: - version "1.20.1" - resolved "https://registry.npmjs.org/body-parser/-/body-parser-1.20.1.tgz" - integrity sha512-jWi7abTbYwajOytWCQc37VulmWiRae5RyTpaCyDcS5/lMdtwSz5lOpDE67srw/HYe35f1z3fDQw+3txg7gNtWw== +body-parser@^1.20.2: + version "1.20.2" + resolved "https://registry.npmjs.org/body-parser/-/body-parser-1.20.2.tgz" + integrity sha512-ml9pReCu3M61kGlqoTm2umSXTlRTuGTx0bfYj+uIUKKYycG5NtSbeetV3faSU6R7ajOPw0g/J1PvK4qNy7s5bA== dependencies: bytes "3.1.2" - content-type "~1.0.4" + content-type "~1.0.5" debug "2.6.9" depd "2.0.0" destroy "1.2.0" @@ -871,7 +871,7 @@ body-parser@1.20.1: iconv-lite "0.4.24" on-finished "2.4.1" qs "6.11.0" - raw-body "2.5.1" + raw-body "2.5.2" type-is "~1.6.18" unpipe "1.0.0" @@ -890,7 +890,7 @@ braces@^3.0.2: dependencies: fill-range "^7.0.1" -browserslist@^4.22.2, "browserslist@>= 4.21.0": +browserslist@^4.22.2: version "4.22.2" resolved "https://registry.npmjs.org/browserslist/-/browserslist-4.22.2.tgz" integrity sha512-0UgcrvQmBDvZHFGdYUehrCNIazki7/lUP3kkoi/r3YB2amZbFM9J43ZRkJTXBUZK4gmx56+Sqk9+Vs9mwZx9+A== @@ -946,16 +946,7 @@ caniuse-lite@^1.0.30001565: resolved "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001576.tgz" integrity sha512-ff5BdakGe2P3SQsMsiqmt1Lc8221NR1VzHj5jXN5vBny9A6fpze94HiVV/n7XRosOlsShJcvMv5mdnpjOGCEgg== -chalk@^2.4.1: - version "2.4.2" - resolved "https://registry.npmjs.org/chalk/-/chalk-2.4.2.tgz" - integrity sha512-Mti+f9lpJNcwF4tWV8/OrTTtF1gZi+f8FqlyAdouralcFWFQWF2+NgCHShjkCb+IFBLq9buZwE1xckQU4peSuQ== - dependencies: - ansi-styles "^3.2.1" - escape-string-regexp "^1.0.5" - supports-color "^5.3.0" - -chalk@^2.4.2: +chalk@^2.4.1, chalk@^2.4.2: version "2.4.2" resolved "https://registry.npmjs.org/chalk/-/chalk-2.4.2.tgz" integrity sha512-Mti+f9lpJNcwF4tWV8/OrTTtF1gZi+f8FqlyAdouralcFWFQWF2+NgCHShjkCb+IFBLq9buZwE1xckQU4peSuQ== @@ -1016,14 +1007,7 @@ collect-v8-coverage@^1.0.0: resolved "https://registry.npmjs.org/collect-v8-coverage/-/collect-v8-coverage-1.0.2.tgz" integrity sha512-lHl4d5/ONEbLlJvaJNtsF/Lz+WvB07u2ycqTYbdrq7UypDXailES4valYb2eWiJFxZlVmpGekfqoxQhzyFdT4Q== -color-convert@^1.9.0: - version "1.9.3" - resolved "https://registry.npmjs.org/color-convert/-/color-convert-1.9.3.tgz" - integrity sha512-QfAUtd+vFdAtFQcC8CCyYt1fYWxSqAiK2cSD6zDB8N3cpsEBAvRxp9zOGg6G/SHHJYAT88/az/IuDGALsNVbGg== - dependencies: - color-name "1.1.3" - -color-convert@^1.9.3: +color-convert@^1.9.0, color-convert@^1.9.3: version "1.9.3" resolved "https://registry.npmjs.org/color-convert/-/color-convert-1.9.3.tgz" integrity sha512-QfAUtd+vFdAtFQcC8CCyYt1fYWxSqAiK2cSD6zDB8N3cpsEBAvRxp9zOGg6G/SHHJYAT88/az/IuDGALsNVbGg== @@ -1037,16 +1021,16 @@ color-convert@^2.0.1: dependencies: color-name "~1.1.4" -color-name@^1.0.0, color-name@~1.1.4: - version "1.1.4" - resolved "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz" - integrity sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA== - color-name@1.1.3: version "1.1.3" resolved "https://registry.npmjs.org/color-name/-/color-name-1.1.3.tgz" integrity sha512-72fSenhMw2HZMTVHeCA9KCmpEIbzWiQsjN+BHcBbS9vr1mtt+vJjPdksIBNUmKAW8TFUDPJK5SUU3QhE9NEXDw== +color-name@^1.0.0, color-name@~1.1.4: + version "1.1.4" + resolved "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz" + integrity sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA== + color-string@^1.6.0: version "1.9.1" resolved "https://registry.npmjs.org/color-string/-/color-string-1.9.1.tgz" @@ -1135,27 +1119,6 @@ cross-spawn@^7.0.3: shebang-command "^2.0.0" which "^2.0.1" -debug@^4.1.0: - version "4.3.4" - resolved "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz" - integrity sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ== - dependencies: - ms "2.1.2" - -debug@^4.1.1: - version "4.3.4" - resolved "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz" - integrity sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ== - dependencies: - ms "2.1.2" - -debug@^4.3.1: - version "4.3.4" - resolved "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz" - integrity sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ== - dependencies: - ms "2.1.2" - debug@2.6.9: version "2.6.9" resolved "https://registry.npmjs.org/debug/-/debug-2.6.9.tgz" @@ -1163,7 +1126,7 @@ debug@2.6.9: dependencies: ms "2.0.0" -debug@4: +debug@4, debug@^4.1.0, debug@^4.1.1, debug@^4.3.1: version "4.3.4" resolved "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz" integrity sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ== @@ -1616,7 +1579,7 @@ inflight@^1.0.4: once "^1.3.0" wrappy "1" -inherits@^2.0.3, inherits@2, inherits@2.0.4: +inherits@2, inherits@2.0.4, inherits@^2.0.3: version "2.0.4" resolved "https://registry.npmjs.org/inherits/-/inherits-2.0.4.tgz" integrity sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ== @@ -1925,7 +1888,7 @@ jest-resolve-dependencies@^29.7.0: jest-regex-util "^29.6.3" jest-snapshot "^29.7.0" -jest-resolve@*, jest-resolve@^29.7.0: +jest-resolve@^29.7.0: version "29.7.0" resolved "https://registry.npmjs.org/jest-resolve/-/jest-resolve-29.7.0.tgz" integrity sha512-IOVhZSrg+UvVAshDSDtHyFCCBUl/Q3AAJv8iZ6ZjnZ74xzvwuzLXid9IIIPgTnY62SJjfuupMKZsZQRsCvxEgA== @@ -2308,11 +2271,6 @@ mkdirp@^1.0.3: resolved "https://registry.npmjs.org/mkdirp/-/mkdirp-1.0.4.tgz" integrity sha512-vVqVZQyf3WLx2Shd0qJ9xuvqgAyKPLAiqITEtqW0oIUjzo3PePDd6fW9iFz30ef7Ysp/oiWqbhszeGWW2T6Gzw== -ms@^2.1.1: - version "2.1.3" - resolved "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz" - integrity sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA== - ms@2.0.0: version "2.0.0" resolved "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz" @@ -2323,7 +2281,7 @@ ms@2.1.2: resolved "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz" integrity sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w== -ms@2.1.3: +ms@2.1.3, ms@^2.1.1: version "2.1.3" resolved "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz" integrity sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA== @@ -2648,7 +2606,7 @@ rimraf@^3.0.2: dependencies: glob "^7.1.3" -safe-buffer@~5.2.0, safe-buffer@5.2.1: +safe-buffer@5.2.1, safe-buffer@~5.2.0: version "5.2.1" resolved "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz" integrity sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ== @@ -2663,31 +2621,12 @@ safe-stable-stringify@^2.3.1: resolved "https://registry.npmjs.org/safer-buffer/-/safer-buffer-2.1.2.tgz" integrity sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg== -semver@^6.0.0: - version "6.3.1" - resolved "https://registry.npmjs.org/semver/-/semver-6.3.1.tgz" - integrity sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA== - -semver@^6.3.0, semver@^6.3.1: +semver@^6.0.0, semver@^6.3.0, semver@^6.3.1: version "6.3.1" resolved "https://registry.npmjs.org/semver/-/semver-6.3.1.tgz" integrity sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA== -semver@^7.3.5: - version "7.5.4" - resolved "https://registry.npmjs.org/semver/-/semver-7.5.4.tgz" - integrity sha512-1bCSESV6Pv+i21Hvpxp3Dx+pSD8lIPt8uVjRrxAUt/nbswYc+tK6Y2btiULjd4+fnq15PX+nqQDC7Oft7WkwcA== - dependencies: - lru-cache "^6.0.0" - -semver@^7.5.3: - version "7.5.4" - resolved "https://registry.npmjs.org/semver/-/semver-7.5.4.tgz" - integrity sha512-1bCSESV6Pv+i21Hvpxp3Dx+pSD8lIPt8uVjRrxAUt/nbswYc+tK6Y2btiULjd4+fnq15PX+nqQDC7Oft7WkwcA== - dependencies: - lru-cache "^6.0.0" - -semver@^7.5.4: +semver@^7.3.5, semver@^7.5.3, semver@^7.5.4: version "7.5.4" resolved "https://registry.npmjs.org/semver/-/semver-7.5.4.tgz" integrity sha512-1bCSESV6Pv+i21Hvpxp3Dx+pSD8lIPt8uVjRrxAUt/nbswYc+tK6Y2btiULjd4+fnq15PX+nqQDC7Oft7WkwcA== @@ -2840,13 +2779,6 @@ statuses@2.0.1: resolved "https://registry.npmjs.org/statuses/-/statuses-2.0.1.tgz" integrity sha512-RwNA9Z/7PrK06rYLIzFMlaF+l73iwpzsqRIFgbMLbTcLD6cOao82TaWefPXQvB2fOC4AjuYSEndS7N/mTCbkdQ== -string_decoder@^1.1.1: - version "1.3.0" - resolved "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz" - integrity sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA== - dependencies: - safe-buffer "~5.2.0" - string-argv@^0.3.2: version "0.3.2" resolved "https://registry.npmjs.org/string-argv/-/string-argv-0.3.2.tgz" @@ -2869,6 +2801,13 @@ string-length@^4.0.1: is-fullwidth-code-point "^3.0.0" strip-ansi "^6.0.1" +string_decoder@^1.1.1: + version "1.3.0" + resolved "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz" + integrity sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA== + dependencies: + safe-buffer "~5.2.0" + strip-ansi@^6.0.0, strip-ansi@^6.0.1: version "6.0.1" resolved "https://registry.npmjs.org/strip-ansi/-/strip-ansi-6.0.1.tgz" @@ -2998,7 +2937,7 @@ undici-types@~5.26.4: resolved "https://registry.npmjs.org/undici-types/-/undici-types-5.26.5.tgz" integrity sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA== -unpipe@~1.0.0, unpipe@1.0.0: +unpipe@1.0.0, unpipe@~1.0.0: version "1.0.0" resolved "https://registry.npmjs.org/unpipe/-/unpipe-1.0.0.tgz" integrity sha512-pjy2bYhSsufwWlKwPc+l3cN7+wuJlK6uz0YdJEOlQDbl6jo/YlPi4mb8agUkVC8BF7V8NuzeyPNqRksA3hztKQ==