Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshots test #18

Merged
merged 19 commits into from
Feb 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions example/inhabitants-with-delay.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# producer will be sending some test data
produceParams:
type: "residents"
# fetch one page of planet data from the Star Wars API
data: ${[1].($fetch('https://swapi.dev/api/planets/?page=' & $string($)).json().results)}
client:
type: test
subscribeResidents:
source: cloudEvent
type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events
to: /${ getResidentsWorkflow }
subscriberId: subscribeResidents
parallelism: 4
client:
type: test
getResidentsWorkflow:
function: /${ function($planetInfo){ $planetInfo ~> $serial([extractResidents, fetchResidents]) } }
extractResidents:
function: /${ function($planet){( $sleep($random() * 10) ; $planet.residents.($fetch($).json()) )} } # add a random delay
fetchResidents:
function: /${ function($residents){$residents.($set('/residents/-',{'name':$.name, 'url':$.url}))} }
residents: [ ]
# starts producer function
send$: $publish(produceParams)
recv$: $subscribe(subscribeResidents)

2 changes: 1 addition & 1 deletion example/pubsub-data-function-pulsar.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ produceParams:
data: |
${
function(){
{'message': 'Rebel Fleet Coordinates', 'location': $random(), 'sender': $floor($random()*10)+1 }
{'message': 'Rebel Fleet Coordinates', 'location': $random(), 'sender': $ceil($random()*10) }
}
}
client:
Expand Down
51 changes: 51 additions & 0 deletions example/rebelCommunication.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Droid R2-D2 is sending messages to the Rebel Alliance's communication channel
produceParams:
type: "rebel-comm-channel"
data: |
${
function(){
{'message': 'Rebel Fleet Coordinates', 'location': $random(), 'sender': $ceil($random()*10) }
}
}
client:
type: pulsarMock
# Droid C-3PO will intercept and log each received message for the Rebel Alliance
subscribeParams: #parameters for subscribing to a holocomm transmission
source: cloudEvent
type: /${ produceParams.type } # subscribe to the same channel as R2-D2 to intercept messages
to: /${processMessageWorkflow}
subscriberId: protocolDroid
parallelism: 5
initialPosition: latest
client:
type: pulsarMock
processMessageWorkflow: /${ function($message){ $message ~> $serial([step1, step2, step3]) }}
step1:
name: logMessage
function: /${ function($e){( $set('/interceptedMessages/-', $e); $e)} }
step2:
name: identifySender
function: |
${ function($e){(
{
'sender': $e.sender,
'name': $fetch('https://swapi.dev/api/people/'& $e.sender).json().name,
'location': $e.location
}
)} }
step3:
name: classifyLocation
function: |
/${ function($e){(
$e.location > 0.5 ? $set('/farFarAway/-', $e) : $set('/nearBy/-', $e); $e
)} }
# Activates R2-D2's message transmission function every 50 milliseconds
send: "${ $setInterval( function(){ $publish(produceParams)}, 50) }"
# Activates C-3PO's message interception function
recv$: $subscribe(subscribeParams)
# interceptedMessages is where C-3PO will store the results of message decoding
interceptedMessages: [ ]
farFarAway: [ ]
zhirafovod marked this conversation as resolved.
Show resolved Hide resolved
nearBy: [ ]
# This condition stops the transmission operation when interceptedMessages has 10 elements
stop$: ($count(interceptedMessages)>=10?($clearInterval(send);'transmissionAccomplished'):'transmissionOngoing')
251 changes: 251 additions & 0 deletions src/test/PulsarMock.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
export class PulsarClientMock {
// A map used to simulate an in-memory store for messages. The keys are topic names, and the values are arrays of message objects.
static inMemoryStore = new Map();
zhirafovod marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with all these being static, it will not be possible to run more than one test client on a given system. I am not sure that is a good idea ... perhaps good enough for now, but prolly worth the small refactor to make then instance fields

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a big, but a feature - I want the data to persist client instance restart, so we can demonstrate how server side acks work


// A counter to generate unique message IDs. It is incremented each time a message is sent.
static messageIdCounter = 0;

// A map where keys are topic names and values are arrays of receive function invocations (listeners). These listeners are called to notify about new messages.
static listeners = new Map();

// The default time in milliseconds to wait before a message is considered not acknowledged (acknowledged messages are removed from visibility to simulate message acknowledgment behavior).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do ack's need timeouts? Is that something pulsar-like that we are trying to simulate? What is the concept of "visibility"?

static ackTimeout = 30000;

// A nested map where the first key is the topic name, the second key is the subscriber ID, and the value is a set of message IDs that have been acknowledged by the subscriber.
static acknowledgedMessages = new Map();

static configureAckTimeout(timeout) {
this.ackTimeout = timeout;
}

async createProducer(config) {
const topic = config.topic;
if (!PulsarClientMock.inMemoryStore.has(topic)) {
PulsarClientMock.inMemoryStore.set(topic, []);
}

return {
send: async (message) => {
const messageId = new MessageId(`message-${++PulsarClientMock.messageIdCounter}`);
const messageInstance = new Message(topic, undefined, message.data, messageId, Date.now(), Date.now(), 0, '');
const messages = PulsarClientMock.inMemoryStore.get(topic) || [];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the || [] will result in a phantom topic that is not tracked anywhere

messages.push({ message: messageInstance, subscriberIds: new Set() }); // Track which subscribers have received the message

// Ensure all subscribers are aware of the new message
PulsarClientMock.notifyListeners(topic);

return { messageId: messageId.toString() };
},
close: async () => {},
};
}

/**
* creates a consumer per topic and subscriberId. If more than one subscriber with the same subscriberId is created
* for the same topic, they will process messages in a FIFO manner.
*/

async subscribe(config) {
const topic = config.topic;
const subscriberId = config.subscription;


return {

/**
* receives either returns the next visible message, or blocks until a message is available.
*/
receive: () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we don't have TS to declare return type is promise, marking this method async may be a good idea for documenting that it returns a Promise

return new Promise((resolve) => {
const tryResolve = () => {
const messages = PulsarClientMock.inMemoryStore.get(topic) || [];
const messageIndex = messages.findIndex(m => !m.subscriberIds.has(subscriberId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use .find() here instead of findIndex

if (messageIndex !== -1) { // there a message available for this subscriber
const message = messages[messageIndex];
message.subscriberIds.add(subscriberId); // Mark as received by this subscriber

// Make the message acknowledged after a timeout for this subscriber
setTimeout(() => {
if (!PulsarClientMock.isAcknowledged(topic, message.message.messageId.id, subscriberId)) {
// If not acknowledged, make it visible again to all subscribers
message.subscriberIds.delete(subscriberId);
PulsarClientMock.notifyListeners(topic);
}
}, PulsarClientMock.ackTimeout);

resolve(message.message);

} else { // no message available for this subscriber, wait for the next one
if (!PulsarClientMock.listeners.has(topic)) {
PulsarClientMock.listeners.set(topic, []);
}
// add this function invocation to the list of listeners for this topic
PulsarClientMock.listeners.get(topic).push(tryResolve);
zhirafovod marked this conversation as resolved.
Show resolved Hide resolved
}
};

tryResolve();
});
},
acknowledge: async (message) => {/**/
PulsarClientMock.acknowledgeMessage(topic, message.messageId.id, subscriberId);
},
acknowledgeId: async (messageId) => {
PulsarClientMock.acknowledgeMessage(topic, messageId.id, subscriberId);
},
close: async () => {},
};
}

static isAcknowledged(topic, messageId, subscriberId) {
const topicAcks = this.acknowledgedMessages.get(topic);
const subscriberAcks = topicAcks ? topicAcks.get(subscriberId) : undefined;
return subscriberAcks ? subscriberAcks.has(messageId) : false;
}

static acknowledgeMessage(topic, messageId, subscriberId) {
if (!this.acknowledgedMessages.has(topic)) {
this.acknowledgedMessages.set(topic, new Map());
}

const topicAcks = this.acknowledgedMessages.get(topic);
if (!topicAcks.has(subscriberId)) {
topicAcks.set(subscriberId, new Set());
}

const subscriberAcks = topicAcks.get(subscriberId);
subscriberAcks.add(messageId);
}

static getTopics() {
return Array.from(PulsarClientMock.inMemoryStore.keys());
}

static getStats(topic, subscriberId) {
const messages = this.inMemoryStore.get(topic) || [];
const topicAcks = this.acknowledgedMessages.get(topic);
const subscriberAcks = topicAcks ? (topicAcks.get(subscriberId) || new Set()) : new Set();

// Calculate inFlight count as messages not yet acknowledged by this subscriber
const inFlight = messages.filter(m => !subscriberAcks.has(m.message.messageId.id)).length;
// Queue length includes messages not yet received or acknowledged by this subscriber
const queueLength = messages.length - subscriberAcks.size;

return {
acknowledgedCount: subscriberAcks.size,
inFlightCount: inFlight,
queueLength,
};
}

static getAcknowledgedMessages(topic, subscriberId) {
const topicAcks = this.acknowledgedMessages.get(topic);
const subscriberAcks = topicAcks ? topicAcks.get(subscriberId) : undefined;

if (!subscriberAcks) {
return [];
}

const messages = this.inMemoryStore.get(topic) || [];
// Filter messages that have been acknowledged by the subscriber
return messages
.filter(m => subscriberAcks.has(m.message.messageId.id))
.map(m => m.message);
}

/**
* Notify all listeners for a topic about a message available.
* A listener is a receive invocation that is waiting for a message to be available.
* A message can be a new one, or the one with expired visibility timeout
*/
static notifyListeners(topic) {
const listeners = PulsarClientMock.listeners.get(topic) || [];
while (listeners.length > 0) {
const listener = listeners.shift();
zhirafovod marked this conversation as resolved.
Show resolved Hide resolved
listener();
}
}

static clear() {
PulsarClientMock.inMemoryStore.clear();
PulsarClientMock.listeners.clear();
PulsarClientMock.acknowledgedMessages.clear();
}

close() {
return Promise.resolve(null);
}
}

export class MessageId {
constructor(id) {
this.id = id;
}

static earliest() {
return new MessageId("earliest");
}

static latest() {
return new MessageId("latest");
}

static deserialize(data) {
// Assuming the input data is a Buffer containing a string ID
return new MessageId(data.toString());
}

serialize() {
// Convert the ID to a Buffer
return Buffer.from(this.id);
}

toString() {
return this.id;
}
}

export class Message {
constructor(topicName, properties, data, messageId, publishTimestamp, eventTimestamp, redeliveryCount, partitionKey) {
this.topicName = topicName;
this.properties = properties;
this.data = data;
this.messageId = messageId;
this.publishTimestamp = publishTimestamp;
this.eventTimestamp = eventTimestamp;
this.redeliveryCount = redeliveryCount;
this.partitionKey = partitionKey;
}

getTopicName() {
return this.topicName;
}

getProperties() {
return this.properties;
}

getData() {
return this.data;
}

getMessageId() {
return this.messageId;
}

getPublishTimestamp() {
return this.publishTimestamp;
}

getEventTimestamp() {
return this.eventTimestamp;
}

getRedeliveryCount() {
return this.redeliveryCount;
}

getPartitionKey() {
return this.partitionKey;
}
}
Loading
Loading