Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use separate eventstream per namespace #140

Merged
merged 13 commits into from
Feb 28, 2024
1 change: 0 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,5 @@ PORT=3000
ETHCONNECT_URL=http://127.0.0.1:5102
ETHCONNECT_TOPIC=tokens_0_0
FACTORY_CONTRACT_ADDRESS="0xd85b3fba5552c48389607954e042e7313a9aec6e"
AUTO_INIT=false
USE_LEGACY_ERC20_SAMPLE=false
USE_LEGACY_ERC721_SAMPLE=false
2 changes: 1 addition & 1 deletion src/app.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/event-stream/event-stream.interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/event-stream/event-stream.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/event-stream/event-stream.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
16 changes: 8 additions & 8 deletions src/event-stream/event-stream.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -22,7 +22,7 @@ import WebSocket from 'ws';
import { FFRequestIDHeader } from '../request-context/constants';
import { Context, newContext } from '../request-context/request-context.decorator';
import { IAbiMethod } from '../tokens/tokens.interfaces';
import { getHttpRequestOptions, getWebsocketOptions } from '../utils';
import { eventStreamName, getHttpRequestOptions, getWebsocketOptions } from '../utils';
import {
Event,
EventBatch,
Expand Down Expand Up @@ -68,7 +68,7 @@ export class EventStreamSocket {
} else {
this.logger.log('Event stream websocket connected');
}
this.produce({ type: 'listen', topic: `${this.topic}/${this.namespace}` });
this.produce({ type: 'listen', topic: eventStreamName(this.topic, this.namespace) });
this.produce({ type: 'listenreplies' });
this.ping();
})
Expand All @@ -84,7 +84,7 @@ export class EventStreamSocket {
}
})
.on('message', (message: string) => {
this.logger.debug(`WS => ${message}`);
this.logger.verbose(`WS => ${message}`);
this.handleMessage(JSON.parse(message));
})
.on('pong', () => {
Expand All @@ -111,11 +111,11 @@ export class EventStreamSocket {
}

ack(batchNumber: number | undefined) {
this.produce({ type: 'ack', topic: `${this.topic}/${this.namespace}`, batchNumber });
this.produce({ type: 'ack', topic: eventStreamName(this.topic, this.namespace), batchNumber });
}

nack(batchNumber: number | undefined) {
this.produce({ type: 'nack', topic: `${this.topic}/${this.namespace}`, batchNumber });
this.produce({ type: 'nack', topic: eventStreamName(this.topic, this.namespace), batchNumber });
}

close() {
Expand Down Expand Up @@ -211,7 +211,7 @@ export class EventStreamService {
this.logger.debug(`Checking for deprecated event steam with topic '${topic}'`);
const deprecatedStream = existingStreams.find(s => s.name === topic);
if (deprecatedStream) {
this.logger.debug(`Purging deprecated eventstream '${deprecatedStream.id}'`);
this.logger.log(`Purging deprecated eventstream '${deprecatedStream.id}'`);
await lastValueFrom(
this.http.delete(
new URL(`/eventstreams/${deprecatedStream.id}`, this.baseUrl).href,
Expand Down Expand Up @@ -358,7 +358,7 @@ export class EventStreamService {
handleEvents: (events: EventBatch) => void,
handleReceipt: (receipt: EventStreamReply) => void,
) {
const name = `${topic}/${namespace}`;
const name = eventStreamName(topic, namespace);
await this.createOrUpdateStream(newContext(), name, topic);

return new EventStreamSocket(
Expand Down
78 changes: 44 additions & 34 deletions src/eventstream-proxy/eventstream-proxy.base.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -15,21 +15,19 @@
// limitations under the License.

import { Logger } from '@nestjs/common';
import { MessageBody, SubscribeMessage } from '@nestjs/websockets';
import { v4 as uuidv4 } from 'uuid';
import { Context, newContext } from '../request-context/request-context.decorator';
import { EventBatch, EventStreamReply } from '../event-stream/event-stream.interfaces';
import { EventStreamService, EventStreamSocket } from '../event-stream/event-stream.service';
import {
WebSocketAck,
WebSocketActionBase,
WebSocketEventsBase,
WebSocketEx,
WebSocketMessage,
WebSocketStart,
} from '../websocket-events/websocket-events.base';
import {
AckMessageData,
ConnectionListener,
EventListener,
WebSocketMessageBatchData,
WebSocketMessageWithId,
Expand All @@ -47,9 +45,9 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
url?: string;
topic?: string;

private connectListeners: ConnectionListener[] = [];
private eventListeners: EventListener[] = [];
private awaitingAck: WebSocketMessageWithId[] = [];
// Map of client IDs to all the messages for which we are awaiting an ack
private awaitingAck: Map<string, WebSocketMessageWithId[]> = new Map();
private subscriptionNames = new Map<string, string>();
private queue = Promise.resolve();

Expand All @@ -68,13 +66,21 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {

handleConnection(client: WebSocketEx) {
super.handleConnection(client);

if (!this.awaitingAck.get(client.id)) {
this.awaitingAck.set(client.id, []);
}

client.on('message', async (message: string) => {
const action = JSON.parse(message) as WebSocketActionBase;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we're now using a combination of Nest websocket message handlers (ie with @SubscribeMessage('ack')) and raw websocket message handlers (with on('message')). Maybe should choose one or the other?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe. I'm not sure how nest works 😆

switch (action.type) {
case 'start':
const startAction = action as WebSocketStart;
this.startListening(client, startAction.namespace);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you create local vars in a case, you may want to wrap with {}

break;
case 'ack':
const ackAction = action as WebSocketAck;
this.handleAck(client, ackAction);
}
});
}
Expand Down Expand Up @@ -134,12 +140,13 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
// Nack any messages that are inflight for that namespace
const nackedMessageIds: Set<string> = new Set();
this.awaitingAck
.filter(msg => msg.namespace === namespace)
?.get(client.id)
?.filter(msg => msg.namespace === namespace)
.map(msg => {
this.namespaceEventStreamSocket.get(namespace)?.nack(msg.batchNumber);
nackedMessageIds.add(msg.id);
});
this.awaitingAck = this.awaitingAck.filter(msg => nackedMessageIds.has(msg.id));
this.awaitingAck.delete(client.id);

// If all clients for this namespace have disconnected, also close the connection to EVMConnect
if (clientSet.size == 0) {
Expand All @@ -150,10 +157,6 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
});
}

addConnectionListener(listener: ConnectionListener) {
this.connectListeners.push(listener);
}

addEventListener(listener: EventListener) {
this.eventListeners.push(listener);
}
Expand Down Expand Up @@ -197,8 +200,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
},
batchNumber: batch.batchNumber,
};
this.awaitingAck.push(message);
this.send(namespace, JSON.stringify(message));
this.send(namespace, message);
}

private async getSubscriptionName(ctx: Context, subId: string) {
Expand All @@ -219,41 +221,49 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
return undefined;
}

@SubscribeMessage('ack')
handleAck(@MessageBody() data: AckMessageData) {
handleAck(client: WebSocketEx, data: WebSocketAck) {
if (data.id === undefined) {
this.logger.error('Received malformed ack');
return;
}

const inflight = this.awaitingAck.find(msg => msg.id === data.id);
this.logger.log(`Received ack ${data.id} inflight=${!!inflight}`);
if (this.namespaceEventStreamSocket !== undefined && inflight !== undefined) {
this.awaitingAck = this.awaitingAck.filter(msg => msg.id !== data.id);
if (
// If nothing is left awaiting an ack - then we clearly need to ack
this.awaitingAck.length === 0 ||
// Or if we have a batch number associated with this ID, then we can only ack if there
// are no other messages in-flight with the same batch number.
(inflight.batchNumber !== undefined &&
!this.awaitingAck.find(msg => msg.batchNumber === inflight.batchNumber))
) {
this.logger.log(`In-flight batch complete (batchNumber=${inflight.batchNumber})`);
this.namespaceEventStreamSocket.get(inflight.namespace)?.ack(inflight.batchNumber);
let awaitingAck = this.awaitingAck.get(client.id);

if (awaitingAck) {
const inflight = awaitingAck.find(msg => msg.id === data.id);
this.logger.log(`Received ack ${data.id} inflight=${!!inflight}`);
if (this.namespaceEventStreamSocket !== undefined && inflight !== undefined) {
// Remove the acked message id from the queue
awaitingAck = awaitingAck.filter(msg => msg.id !== data.id);
this.awaitingAck.set(client.id, awaitingAck);
if (
// If nothing is left awaiting an ack - then we clearly need to ack
awaitingAck.length === 0 ||
// Or if we have a batch number associated with this ID, then we can only ack if there
// are no other messages in-flight with the same batch number.
(inflight.batchNumber !== undefined &&
!awaitingAck.filter(msg => msg.batchNumber === inflight.batchNumber))
) {
this.logger.log(`In-flight batch complete (batchNumber=${inflight.batchNumber})`);
this.namespaceEventStreamSocket.get(inflight.namespace)?.ack(inflight.batchNumber);
}
}
} else {
this.logger.warn(`Received unrecognized ack from client ${client.id} for message ${data.id}`);
}
}

send(namespace, payload: string) {
send(namespace, payload: WebSocketMessageWithId) {
const clients = this.namespaceClients.get(namespace);
if (clients) {
// Randomly select a connected client for this namespace to distribute load
const selected = Math.floor(Math.random() * clients.size);
let i = 0;
for (let client of clients.keys()) {
for (const client of clients.keys()) {
if (i++ == selected) {
this.logger.debug(`WS <= ${payload}`);
client.send(payload);
this.awaitingAck.get(client.id)?.push(payload);
this.logger.verbose(`WS <= ${payload}`);
client.send(JSON.stringify(payload));
return;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/eventstream-proxy/eventstream-proxy.gateway.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/eventstream-proxy/eventstream-proxy.gateway.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
6 changes: 1 addition & 5 deletions src/eventstream-proxy/eventstream-proxy.interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -35,10 +35,6 @@ export interface WebSocketMessageWithId extends WebSocketMessage {
batchNumber: number | undefined;
}

export interface AckMessageData {
id?: string;
}

export interface WebSocketMessageBatchData {
events: WebSocketMessage[];
}
2 changes: 1 addition & 1 deletion src/eventstream-proxy/eventstream-proxy.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/health/health.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
3 changes: 1 addition & 2 deletions src/main.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -78,7 +78,6 @@ async function bootstrap() {
const ethConnectUrl = config.get<string>('ETHCONNECT_URL', '');
const fftmUrl = config.get<string>('FFTM_URL', ''); // Optional. Currently used only for SendTransaction API calls when set
const topic = config.get<string>('ETHCONNECT_TOPIC', 'tokenERC20ERC721');
const autoInit = config.get<string>('AUTO_INIT', 'true');
const username = config.get<string>('ETHCONNECT_USERNAME', '');
const password = config.get<string>('ETHCONNECT_PASSWORD', '');
const factoryAddress = config.get<string>('FACTORY_CONTRACT_ADDRESS', '');
Expand Down
2 changes: 1 addition & 1 deletion src/request-context/request-id.middleware.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/request-logging.interceptor.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/request-logging.interceptor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/tokens/abimapper.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/tokens/blockchain.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/tokens/erc165.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/tokens/erc20.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/tokens/erc721.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/tokens/tokens.controller.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
7 changes: 5 additions & 2 deletions src/tokens/tokens.controller.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -36,7 +36,10 @@ import { TokensService } from './tokens.service';

@Controller()
export class TokensController {
constructor(private service: TokensService, private blockchain: BlockchainConnectorService) {}
constructor(
private service: TokensService,
private blockchain: BlockchainConnectorService,
) {}

@Post('init')
@HttpCode(204)
Expand Down
2 changes: 1 addition & 1 deletion src/tokens/tokens.interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
Loading
Loading