From 0feb7f07ad1c365eb17de3102c6cec932d7a3454 Mon Sep 17 00:00:00 2001 From: Sergey Sergeev Date: Fri, 23 Feb 2024 20:42:07 -0800 Subject: [PATCH] feedback++ --- src/test/PulsarMock.js | 131 +++++++++++++++++++++----------- src/test/StatedWorkflow.test.js | 8 +- src/workflow/StatedWorkflow.js | 4 +- 3 files changed, 91 insertions(+), 52 deletions(-) diff --git a/src/test/PulsarMock.js b/src/test/PulsarMock.js index a86612a..13c8250 100644 --- a/src/test/PulsarMock.js +++ b/src/test/PulsarMock.js @@ -1,18 +1,16 @@ export class PulsarClientMock { - static inMemoryStore = new Map(); // Static store to hold messages for each topic - static messageIdCounter = 0; // Global counter to generate unique message IDs - static listeners = new Map(); // Global map to hold listeners for message consumption - static ackTimeout = 30000; // Default acknowledgment timeout (in milliseconds) - static acknowledgedMessages = new Map(); // Static store to hold acknowledged messages for each topic + static inMemoryStore = new Map(); + static messageIdCounter = 0; + static listeners = new Map(); + static ackTimeout = 30000; + static acknowledgedMessages = new Map(); // Stores acknowledgments per subscriber ID per topic - // Allows configuration of the acknowledgment timeout static configureAckTimeout(timeout) { this.ackTimeout = timeout; } async createProducer(config) { const topic = config.topic; - // Ensure a message queue exists for the topic if (!PulsarClientMock.inMemoryStore.has(topic)) { PulsarClientMock.inMemoryStore.set(topic, []); } @@ -22,8 +20,9 @@ export class PulsarClientMock { const messageId = new MessageId(`message-${++PulsarClientMock.messageIdCounter}`); const messageInstance = new Message(topic, undefined, message.data, messageId, Date.now(), Date.now(), 0, ''); const messages = PulsarClientMock.inMemoryStore.get(topic) || []; - messages.push({ message: messageInstance, visible: true }); + messages.push({ message: messageInstance, subscriberIds: new Set() }); // Track which subscribers have received the message + // Ensure all subscribers are aware of the new message PulsarClientMock.notifyListeners(topic); return { messageId: messageId.toString() }; @@ -32,29 +31,46 @@ export class PulsarClientMock { }; } + /** + * creates a consumer per topic and subscriberId. If more than one subscriber with the same subscriberId is created + * for the same topic, they will process messages in a FIFO manner. + */ + async subscribe(config) { const topic = config.topic; + const subscriberId = config.subscription; + return { + + /** + * receives either returns the next visible message, or blocks until a message is available. + */ receive: () => { return new Promise((resolve) => { const tryResolve = () => { const messages = PulsarClientMock.inMemoryStore.get(topic) || []; - const messageIndex = messages.findIndex(m => m.visible); - if (messageIndex !== -1) { - // Make the message temporarily invisible to simulate message locking - messages[messageIndex].visible = false; + const messageIndex = messages.findIndex(m => !m.subscriberIds.has(subscriberId)); + if (messageIndex !== -1) { // there a message available for this subscriber + const message = messages[messageIndex]; + message.subscriberIds.add(subscriberId); // Mark as received by this subscriber + + // Make the message acknowledged after a timeout for this subscriber setTimeout(() => { - // Make the message visible again after the timeout - messages[messageIndex].visible = true; - PulsarClientMock.notifyListeners(topic); + if (!PulsarClientMock.isAcknowledged(topic, message.message.messageId.id, subscriberId)) { + // If not acknowledged, make it visible again to all subscribers + message.subscriberIds.delete(subscriberId); + PulsarClientMock.notifyListeners(topic); + } }, PulsarClientMock.ackTimeout); - resolve(messages[messageIndex].message); - } else { - // No visible messages available, wait for new messages + + resolve(message.message); + + } else { // no message available for this subscriber, wait for the next one if (!PulsarClientMock.listeners.has(topic)) { PulsarClientMock.listeners.set(topic, []); } + // add this function invocation to the list of listeners for this topic PulsarClientMock.listeners.get(topic).push(tryResolve); } }; @@ -62,58 +78,77 @@ export class PulsarClientMock { tryResolve(); }); }, - acknowledge: async (message) => { - PulsarClientMock.acknowledgeMessage(topic, message.messageId.id); + acknowledge: async (message) => {/**/ + PulsarClientMock.acknowledgeMessage(topic, message.messageId.id, subscriberId); }, acknowledgeId: async (messageId) => { - PulsarClientMock.acknowledgeMessage(topic, messageId.id); + PulsarClientMock.acknowledgeMessage(topic, messageId.id, subscriberId); }, close: async () => {}, }; } - static acknowledgeMessage(topic, messageId) { - const messages = PulsarClientMock.inMemoryStore.get(topic) || []; - const messageIndex = messages.findIndex(m => m.message.messageId.id === messageId); - if (messageIndex !== -1) { - const [acknowledgedMessage] = messages.splice(messageIndex, 1); // Remove and get the acknowledged message - // Store acknowledged message - if (!this.acknowledgedMessages.has(topic)) { - this.acknowledgedMessages.set(topic, []); - } - this.acknowledgedMessages.get(topic).push(acknowledgedMessage.message); + static isAcknowledged(topic, messageId, subscriberId) { + const topicAcks = this.acknowledgedMessages.get(topic); + const subscriberAcks = topicAcks ? topicAcks.get(subscriberId) : undefined; + return subscriberAcks ? subscriberAcks.has(messageId) : false; + } + + static acknowledgeMessage(topic, messageId, subscriberId) { + if (!this.acknowledgedMessages.has(topic)) { + this.acknowledgedMessages.set(topic, new Map()); + } + + const topicAcks = this.acknowledgedMessages.get(topic); + if (!topicAcks.has(subscriberId)) { + topicAcks.set(subscriberId, new Set()); } + + const subscriberAcks = topicAcks.get(subscriberId); + subscriberAcks.add(messageId); } static getTopics() { return Array.from(PulsarClientMock.inMemoryStore.keys()); } - // Method to return statistics for the client - static getStats(topic) { + static getStats(topic, subscriberId) { const messages = this.inMemoryStore.get(topic) || []; - const acknowledged = this.acknowledgedMessages.get(topic) || []; - const inFlight = messages.filter(m => !m.visible).length; - const queueLength = messages.length + acknowledged.length; + const topicAcks = this.acknowledgedMessages.get(topic); + const subscriberAcks = topicAcks ? (topicAcks.get(subscriberId) || new Set()) : new Set(); + + // Calculate inFlight count as messages not yet acknowledged by this subscriber + const inFlight = messages.filter(m => !subscriberAcks.has(m.message.messageId.id)).length; + // Queue length includes messages not yet received or acknowledged by this subscriber + const queueLength = messages.length - subscriberAcks.size; return { - acknowledgedCount: acknowledged.length, + acknowledgedCount: subscriberAcks.size, inFlightCount: inFlight, queueLength, }; } - // Method to return all acknowledged messages for a topic - static getAcknowledgedMessages(topic) { - return this.acknowledgedMessages.get(topic) || []; - } + static getAcknowledgedMessages(topic, subscriberId) { + const topicAcks = this.acknowledgedMessages.get(topic); + const subscriberAcks = topicAcks ? topicAcks.get(subscriberId) : undefined; - static clear() { - PulsarClientMock.inMemoryStore.clear(); - PulsarClientMock.listeners.clear(); - PulsarClientMock.acknowledgedMessages.clear(); + if (!subscriberAcks) { + return []; + } + + const messages = this.inMemoryStore.get(topic) || []; + // Filter messages that have been acknowledged by the subscriber + return messages + .filter(m => subscriberAcks.has(m.message.messageId.id)) + .map(m => m.message); } + /** + * Notify all listeners for a topic about a message available. + * A listener is a receive invocation that is waiting for a message to be available. + * A message can be a new one, or the one with expired visibility timeout + */ static notifyListeners(topic) { const listeners = PulsarClientMock.listeners.get(topic) || []; while (listeners.length > 0) { @@ -122,6 +157,12 @@ export class PulsarClientMock { } } + static clear() { + PulsarClientMock.inMemoryStore.clear(); + PulsarClientMock.listeners.clear(); + PulsarClientMock.acknowledgedMessages.clear(); + } + close() { return Promise.resolve(null); } diff --git a/src/test/StatedWorkflow.test.js b/src/test/StatedWorkflow.test.js index 69e2829..eedf3c9 100644 --- a/src/test/StatedWorkflow.test.js +++ b/src/test/StatedWorkflow.test.js @@ -16,7 +16,6 @@ import fs from 'fs'; import yaml from 'js-yaml'; import { fileURLToPath } from 'url'; import path from 'path'; -import {WorkflowDispatcher} from "../workflow/WorkflowDispatcher.js"; import StatedREPL from "stated-js/dist/src/StatedREPL.js"; import {EnhancedPrintFunc} from "./TestTools.js"; import {rateLimit} from "stated-js/dist/src/utils/rateLimit.js"; @@ -1020,11 +1019,12 @@ test("subscribePulsar with pulsarMock client", async () => { console.log("waiting for at least 10 messages to be acknowledged"); const topic = PulsarClientMock.getTopics()[0]; // we use only one topic in the test + const subscriberId = tp.output.subscribeParams.type; while (!Array.isArray(PulsarClientMock.getAcknowledgedMessages(topic)) - || PulsarClientMock.getAcknowledgedMessages(topic).length < 10) { - console.log(`PulsarMock topic ${topic} stats: ${StatedREPL.stringify(PulsarClientMock.getStats(topic))}`); + || PulsarClientMock.getAcknowledgedMessages(topic, subscriberId).length < 10) { + console.log(`PulsarMock topic ${topic} stats for subscriberId ${subscriberId}: ${StatedREPL.stringify(PulsarClientMock.getStats(topic, subscriberId))}`); await new Promise(resolve => setTimeout(resolve, 500)); // Poll every 500ms }; - console.log(`PulsarMock topic ${topic} stats: ${StatedREPL.stringify(PulsarClientMock.getStats(topic))}`); + console.log(`PulsarMock topic ${topic} stats for subscriberId ${subscriberId}: ${StatedREPL.stringify(PulsarClientMock.getStats(topic, subscriberId))}`); }, 200000) diff --git a/src/workflow/StatedWorkflow.js b/src/workflow/StatedWorkflow.js index 8933022..bc8f6a9 100644 --- a/src/workflow/StatedWorkflow.js +++ b/src/workflow/StatedWorkflow.js @@ -19,12 +19,10 @@ import Pulsar from 'pulsar-client'; import {Kafka, logLevel} from 'kafkajs'; import winston from "winston"; import {WorkflowDispatcher} from "./WorkflowDispatcher.js"; -import {StepLog} from "./StepLog.js"; import Step from "./Step.js"; import {createStepPersistence} from "./StepPersistence.js"; import {TemplateUtils} from "./utils/TemplateUtils.js"; import {WorkflowPersistence} from "./WorkflowPersistence.js"; -import jp from "stated-js/dist/src/JsonPointer.js"; import util from "util"; import fs from "fs"; import path from "path"; @@ -384,7 +382,7 @@ export class StatedWorkflow { // we create a callback to acknowledge the message const dataAckCallback = async () => { const promise = consumer.acknowledge(message); - console.log(`acknowledging message ${message} for messageData: ${messageData}`); + console.log(`acknowledging messageId ${StatedREPL.stringify(message.getMessageId().toString())} for messageData: ${StatedREPL.stringify(messageData)}`); } //if the dispatchers max parallelism is reached this loop should block, which is why we await