-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use separate eventstream per namespace #140
Merged
Merged
Changes from 3 commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
60ebd20
Use separate eventstream per namespace
nguyer 77ed3fe
Remove unused imports
nguyer 4f34681
Adjust test timing
nguyer d96aa08
Update node version
nguyer e58f317
Allow re-activation of token pools
nguyer fe873ea
Remove unused import
nguyer f62c69d
Remove deprecated eventstreams
nguyer 916d4e7
Updates
nguyer 352668e
Add WS log messages
nguyer 4ebae6b
PR feedback
nguyer 4d85ab6
Scope acks to recipient client
nguyer 4d59b01
Fix eventstream cache
nguyer d6cbee7
PR feedback
nguyer File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
// Copyright © 2022 Kaleido, Inc. | ||
// Copyright © 2024 Kaleido, Inc. | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
// | ||
|
@@ -15,21 +15,19 @@ | |
// limitations under the License. | ||
|
||
import { Logger } from '@nestjs/common'; | ||
import { MessageBody, SubscribeMessage } from '@nestjs/websockets'; | ||
import { v4 as uuidv4 } from 'uuid'; | ||
import { Context, newContext } from '../request-context/request-context.decorator'; | ||
import { EventBatch, EventStreamReply } from '../event-stream/event-stream.interfaces'; | ||
import { EventStreamService, EventStreamSocket } from '../event-stream/event-stream.service'; | ||
import { | ||
WebSocketAck, | ||
WebSocketActionBase, | ||
WebSocketEventsBase, | ||
WebSocketEx, | ||
WebSocketMessage, | ||
WebSocketStart, | ||
} from '../websocket-events/websocket-events.base'; | ||
import { | ||
AckMessageData, | ||
ConnectionListener, | ||
EventListener, | ||
WebSocketMessageBatchData, | ||
WebSocketMessageWithId, | ||
|
@@ -47,9 +45,9 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { | |
url?: string; | ||
topic?: string; | ||
|
||
private connectListeners: ConnectionListener[] = []; | ||
private eventListeners: EventListener[] = []; | ||
private awaitingAck: WebSocketMessageWithId[] = []; | ||
// Map of client IDs to all the messages for which we are awaiting an ack | ||
private awaitingAck: Map<string, WebSocketMessageWithId[]> = new Map(); | ||
private subscriptionNames = new Map<string, string>(); | ||
private queue = Promise.resolve(); | ||
|
||
|
@@ -68,13 +66,21 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { | |
|
||
handleConnection(client: WebSocketEx) { | ||
super.handleConnection(client); | ||
|
||
if (!this.awaitingAck.get(client.id)) { | ||
this.awaitingAck.set(client.id, []); | ||
} | ||
|
||
client.on('message', async (message: string) => { | ||
const action = JSON.parse(message) as WebSocketActionBase; | ||
switch (action.type) { | ||
case 'start': | ||
const startAction = action as WebSocketStart; | ||
this.startListening(client, startAction.namespace); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you create local vars in a |
||
break; | ||
case 'ack': | ||
const ackAction = action as WebSocketAck; | ||
this.handleAck(client, ackAction); | ||
} | ||
}); | ||
} | ||
|
@@ -134,12 +140,13 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { | |
// Nack any messages that are inflight for that namespace | ||
const nackedMessageIds: Set<string> = new Set(); | ||
this.awaitingAck | ||
.filter(msg => msg.namespace === namespace) | ||
?.get(client.id) | ||
?.filter(msg => msg.namespace === namespace) | ||
.map(msg => { | ||
this.namespaceEventStreamSocket.get(namespace)?.nack(msg.batchNumber); | ||
nackedMessageIds.add(msg.id); | ||
}); | ||
this.awaitingAck = this.awaitingAck.filter(msg => nackedMessageIds.has(msg.id)); | ||
this.awaitingAck.delete(client.id); | ||
|
||
// If all clients for this namespace have disconnected, also close the connection to EVMConnect | ||
if (clientSet.size == 0) { | ||
|
@@ -150,10 +157,6 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { | |
}); | ||
} | ||
|
||
addConnectionListener(listener: ConnectionListener) { | ||
this.connectListeners.push(listener); | ||
} | ||
|
||
addEventListener(listener: EventListener) { | ||
this.eventListeners.push(listener); | ||
} | ||
|
@@ -197,8 +200,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { | |
}, | ||
batchNumber: batch.batchNumber, | ||
}; | ||
this.awaitingAck.push(message); | ||
this.send(namespace, JSON.stringify(message)); | ||
this.send(namespace, message); | ||
} | ||
|
||
private async getSubscriptionName(ctx: Context, subId: string) { | ||
|
@@ -219,41 +221,49 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { | |
return undefined; | ||
} | ||
|
||
@SubscribeMessage('ack') | ||
handleAck(@MessageBody() data: AckMessageData) { | ||
handleAck(client: WebSocketEx, data: WebSocketAck) { | ||
if (data.id === undefined) { | ||
this.logger.error('Received malformed ack'); | ||
return; | ||
} | ||
|
||
const inflight = this.awaitingAck.find(msg => msg.id === data.id); | ||
this.logger.log(`Received ack ${data.id} inflight=${!!inflight}`); | ||
if (this.namespaceEventStreamSocket !== undefined && inflight !== undefined) { | ||
this.awaitingAck = this.awaitingAck.filter(msg => msg.id !== data.id); | ||
if ( | ||
// If nothing is left awaiting an ack - then we clearly need to ack | ||
this.awaitingAck.length === 0 || | ||
// Or if we have a batch number associated with this ID, then we can only ack if there | ||
// are no other messages in-flight with the same batch number. | ||
(inflight.batchNumber !== undefined && | ||
!this.awaitingAck.find(msg => msg.batchNumber === inflight.batchNumber)) | ||
) { | ||
this.logger.log(`In-flight batch complete (batchNumber=${inflight.batchNumber})`); | ||
this.namespaceEventStreamSocket.get(inflight.namespace)?.ack(inflight.batchNumber); | ||
let awaitingAck = this.awaitingAck.get(client.id); | ||
|
||
if (awaitingAck) { | ||
const inflight = awaitingAck.find(msg => msg.id === data.id); | ||
this.logger.log(`Received ack ${data.id} inflight=${!!inflight}`); | ||
if (this.namespaceEventStreamSocket !== undefined && inflight !== undefined) { | ||
// Remove the acked message id from the queue | ||
awaitingAck = awaitingAck.filter(msg => msg.id !== data.id); | ||
this.awaitingAck.set(client.id, awaitingAck); | ||
if ( | ||
// If nothing is left awaiting an ack - then we clearly need to ack | ||
awaitingAck.length === 0 || | ||
// Or if we have a batch number associated with this ID, then we can only ack if there | ||
// are no other messages in-flight with the same batch number. | ||
(inflight.batchNumber !== undefined && | ||
!awaitingAck.filter(msg => msg.batchNumber === inflight.batchNumber)) | ||
) { | ||
this.logger.log(`In-flight batch complete (batchNumber=${inflight.batchNumber})`); | ||
this.namespaceEventStreamSocket.get(inflight.namespace)?.ack(inflight.batchNumber); | ||
} | ||
} | ||
} else { | ||
this.logger.warn(`Received unrecognized ack from client ${client.id} for message ${data.id}`); | ||
} | ||
} | ||
|
||
send(namespace, payload: string) { | ||
send(namespace, payload: WebSocketMessageWithId) { | ||
const clients = this.namespaceClients.get(namespace); | ||
if (clients) { | ||
// Randomly select a connected client for this namespace to distribute load | ||
const selected = Math.floor(Math.random() * clients.size); | ||
let i = 0; | ||
for (let client of clients.keys()) { | ||
for (const client of clients.keys()) { | ||
if (i++ == selected) { | ||
this.logger.debug(`WS <= ${payload}`); | ||
client.send(payload); | ||
this.awaitingAck.get(client.id)?.push(payload); | ||
this.logger.verbose(`WS <= ${payload}`); | ||
client.send(JSON.stringify(payload)); | ||
return; | ||
} | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we're now using a combination of Nest websocket message handlers (ie with
@SubscribeMessage('ack')
) and raw websocket message handlers (withon('message')
). Maybe should choose one or the other?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe. I'm not sure how nest works 😆