Skip to content

Commit

Permalink
feedback++
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Sergeev committed Feb 24, 2024
1 parent 9f9ae55 commit 0feb7f0
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 52 deletions.
131 changes: 86 additions & 45 deletions src/test/PulsarMock.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
export class PulsarClientMock {
static inMemoryStore = new Map(); // Static store to hold messages for each topic
static messageIdCounter = 0; // Global counter to generate unique message IDs
static listeners = new Map(); // Global map to hold listeners for message consumption
static ackTimeout = 30000; // Default acknowledgment timeout (in milliseconds)
static acknowledgedMessages = new Map(); // Static store to hold acknowledged messages for each topic
static inMemoryStore = new Map();
static messageIdCounter = 0;
static listeners = new Map();
static ackTimeout = 30000;
static acknowledgedMessages = new Map(); // Stores acknowledgments per subscriber ID per topic

// Allows configuration of the acknowledgment timeout
static configureAckTimeout(timeout) {
this.ackTimeout = timeout;
}

async createProducer(config) {
const topic = config.topic;
// Ensure a message queue exists for the topic
if (!PulsarClientMock.inMemoryStore.has(topic)) {
PulsarClientMock.inMemoryStore.set(topic, []);
}
Expand All @@ -22,8 +20,9 @@ export class PulsarClientMock {
const messageId = new MessageId(`message-${++PulsarClientMock.messageIdCounter}`);
const messageInstance = new Message(topic, undefined, message.data, messageId, Date.now(), Date.now(), 0, '');
const messages = PulsarClientMock.inMemoryStore.get(topic) || [];
messages.push({ message: messageInstance, visible: true });
messages.push({ message: messageInstance, subscriberIds: new Set() }); // Track which subscribers have received the message

// Ensure all subscribers are aware of the new message
PulsarClientMock.notifyListeners(topic);

return { messageId: messageId.toString() };
Expand All @@ -32,88 +31,124 @@ export class PulsarClientMock {
};
}

/**
* creates a consumer per topic and subscriberId. If more than one subscriber with the same subscriberId is created
* for the same topic, they will process messages in a FIFO manner.
*/

async subscribe(config) {
const topic = config.topic;
const subscriberId = config.subscription;


return {

/**
* receives either returns the next visible message, or blocks until a message is available.
*/
receive: () => {
return new Promise((resolve) => {
const tryResolve = () => {
const messages = PulsarClientMock.inMemoryStore.get(topic) || [];
const messageIndex = messages.findIndex(m => m.visible);
if (messageIndex !== -1) {
// Make the message temporarily invisible to simulate message locking
messages[messageIndex].visible = false;
const messageIndex = messages.findIndex(m => !m.subscriberIds.has(subscriberId));
if (messageIndex !== -1) { // there a message available for this subscriber
const message = messages[messageIndex];
message.subscriberIds.add(subscriberId); // Mark as received by this subscriber

// Make the message acknowledged after a timeout for this subscriber
setTimeout(() => {
// Make the message visible again after the timeout
messages[messageIndex].visible = true;
PulsarClientMock.notifyListeners(topic);
if (!PulsarClientMock.isAcknowledged(topic, message.message.messageId.id, subscriberId)) {
// If not acknowledged, make it visible again to all subscribers
message.subscriberIds.delete(subscriberId);
PulsarClientMock.notifyListeners(topic);
}
}, PulsarClientMock.ackTimeout);
resolve(messages[messageIndex].message);
} else {
// No visible messages available, wait for new messages

resolve(message.message);

} else { // no message available for this subscriber, wait for the next one
if (!PulsarClientMock.listeners.has(topic)) {
PulsarClientMock.listeners.set(topic, []);
}
// add this function invocation to the list of listeners for this topic
PulsarClientMock.listeners.get(topic).push(tryResolve);
}
};

tryResolve();
});
},
acknowledge: async (message) => {
PulsarClientMock.acknowledgeMessage(topic, message.messageId.id);
acknowledge: async (message) => {/**/
PulsarClientMock.acknowledgeMessage(topic, message.messageId.id, subscriberId);
},
acknowledgeId: async (messageId) => {
PulsarClientMock.acknowledgeMessage(topic, messageId.id);
PulsarClientMock.acknowledgeMessage(topic, messageId.id, subscriberId);
},
close: async () => {},
};
}

static acknowledgeMessage(topic, messageId) {
const messages = PulsarClientMock.inMemoryStore.get(topic) || [];
const messageIndex = messages.findIndex(m => m.message.messageId.id === messageId);
if (messageIndex !== -1) {
const [acknowledgedMessage] = messages.splice(messageIndex, 1); // Remove and get the acknowledged message
// Store acknowledged message
if (!this.acknowledgedMessages.has(topic)) {
this.acknowledgedMessages.set(topic, []);
}
this.acknowledgedMessages.get(topic).push(acknowledgedMessage.message);
static isAcknowledged(topic, messageId, subscriberId) {
const topicAcks = this.acknowledgedMessages.get(topic);
const subscriberAcks = topicAcks ? topicAcks.get(subscriberId) : undefined;
return subscriberAcks ? subscriberAcks.has(messageId) : false;
}

static acknowledgeMessage(topic, messageId, subscriberId) {
if (!this.acknowledgedMessages.has(topic)) {
this.acknowledgedMessages.set(topic, new Map());
}

const topicAcks = this.acknowledgedMessages.get(topic);
if (!topicAcks.has(subscriberId)) {
topicAcks.set(subscriberId, new Set());
}

const subscriberAcks = topicAcks.get(subscriberId);
subscriberAcks.add(messageId);
}

static getTopics() {
return Array.from(PulsarClientMock.inMemoryStore.keys());
}

// Method to return statistics for the client
static getStats(topic) {
static getStats(topic, subscriberId) {
const messages = this.inMemoryStore.get(topic) || [];
const acknowledged = this.acknowledgedMessages.get(topic) || [];
const inFlight = messages.filter(m => !m.visible).length;
const queueLength = messages.length + acknowledged.length;
const topicAcks = this.acknowledgedMessages.get(topic);
const subscriberAcks = topicAcks ? (topicAcks.get(subscriberId) || new Set()) : new Set();

// Calculate inFlight count as messages not yet acknowledged by this subscriber
const inFlight = messages.filter(m => !subscriberAcks.has(m.message.messageId.id)).length;
// Queue length includes messages not yet received or acknowledged by this subscriber
const queueLength = messages.length - subscriberAcks.size;

return {
acknowledgedCount: acknowledged.length,
acknowledgedCount: subscriberAcks.size,
inFlightCount: inFlight,
queueLength,
};
}

// Method to return all acknowledged messages for a topic
static getAcknowledgedMessages(topic) {
return this.acknowledgedMessages.get(topic) || [];
}
static getAcknowledgedMessages(topic, subscriberId) {
const topicAcks = this.acknowledgedMessages.get(topic);
const subscriberAcks = topicAcks ? topicAcks.get(subscriberId) : undefined;

static clear() {
PulsarClientMock.inMemoryStore.clear();
PulsarClientMock.listeners.clear();
PulsarClientMock.acknowledgedMessages.clear();
if (!subscriberAcks) {
return [];
}

const messages = this.inMemoryStore.get(topic) || [];
// Filter messages that have been acknowledged by the subscriber
return messages
.filter(m => subscriberAcks.has(m.message.messageId.id))
.map(m => m.message);
}

/**
* Notify all listeners for a topic about a message available.
* A listener is a receive invocation that is waiting for a message to be available.
* A message can be a new one, or the one with expired visibility timeout
*/
static notifyListeners(topic) {
const listeners = PulsarClientMock.listeners.get(topic) || [];
while (listeners.length > 0) {
Expand All @@ -122,6 +157,12 @@ export class PulsarClientMock {
}
}

static clear() {
PulsarClientMock.inMemoryStore.clear();
PulsarClientMock.listeners.clear();
PulsarClientMock.acknowledgedMessages.clear();
}

close() {
return Promise.resolve(null);
}
Expand Down
8 changes: 4 additions & 4 deletions src/test/StatedWorkflow.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import fs from 'fs';
import yaml from 'js-yaml';
import { fileURLToPath } from 'url';
import path from 'path';
import {WorkflowDispatcher} from "../workflow/WorkflowDispatcher.js";
import StatedREPL from "stated-js/dist/src/StatedREPL.js";
import {EnhancedPrintFunc} from "./TestTools.js";
import {rateLimit} from "stated-js/dist/src/utils/rateLimit.js";
Expand Down Expand Up @@ -1020,11 +1019,12 @@ test("subscribePulsar with pulsarMock client", async () => {

console.log("waiting for at least 10 messages to be acknowledged");
const topic = PulsarClientMock.getTopics()[0]; // we use only one topic in the test
const subscriberId = tp.output.subscribeParams.type;

while (!Array.isArray(PulsarClientMock.getAcknowledgedMessages(topic))
|| PulsarClientMock.getAcknowledgedMessages(topic).length < 10) {
console.log(`PulsarMock topic ${topic} stats: ${StatedREPL.stringify(PulsarClientMock.getStats(topic))}`);
|| PulsarClientMock.getAcknowledgedMessages(topic, subscriberId).length < 10) {
console.log(`PulsarMock topic ${topic} stats for subscriberId ${subscriberId}: ${StatedREPL.stringify(PulsarClientMock.getStats(topic, subscriberId))}`);
await new Promise(resolve => setTimeout(resolve, 500)); // Poll every 500ms
};
console.log(`PulsarMock topic ${topic} stats: ${StatedREPL.stringify(PulsarClientMock.getStats(topic))}`);
console.log(`PulsarMock topic ${topic} stats for subscriberId ${subscriberId}: ${StatedREPL.stringify(PulsarClientMock.getStats(topic, subscriberId))}`);
}, 200000)
4 changes: 1 addition & 3 deletions src/workflow/StatedWorkflow.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ import Pulsar from 'pulsar-client';
import {Kafka, logLevel} from 'kafkajs';
import winston from "winston";
import {WorkflowDispatcher} from "./WorkflowDispatcher.js";
import {StepLog} from "./StepLog.js";
import Step from "./Step.js";
import {createStepPersistence} from "./StepPersistence.js";
import {TemplateUtils} from "./utils/TemplateUtils.js";
import {WorkflowPersistence} from "./WorkflowPersistence.js";
import jp from "stated-js/dist/src/JsonPointer.js";
import util from "util";
import fs from "fs";
import path from "path";
Expand Down Expand Up @@ -384,7 +382,7 @@ export class StatedWorkflow {
// we create a callback to acknowledge the message
const dataAckCallback = async () => {
const promise = consumer.acknowledge(message);
console.log(`acknowledging message ${message} for messageData: ${messageData}`);
console.log(`acknowledging messageId ${StatedREPL.stringify(message.getMessageId().toString())} for messageData: ${StatedREPL.stringify(messageData)}`);
}

//if the dispatchers max parallelism is reached this loop should block, which is why we await
Expand Down

0 comments on commit 0feb7f0

Please sign in to comment.