From 9df1ee2464d27dfea42c387e27fc084c0278b420 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20Kol=C3=A1rik?= Date: Wed, 13 Mar 2024 13:52:39 +0100 Subject: [PATCH] feat(sharded): add an option for dynamic private channels --- lib/sharded-adapter.ts | 42 ++++++++++++++++++++++++++++++------------ test/test-runner.ts | 22 ++++++++++++++++++++++ 2 files changed, 52 insertions(+), 12 deletions(-) diff --git a/lib/sharded-adapter.ts b/lib/sharded-adapter.ts index eba8e9e..ea3f37a 100644 --- a/lib/sharded-adapter.ts +++ b/lib/sharded-adapter.ts @@ -34,13 +34,17 @@ export interface ShardedRedisAdapterOptions { * The default value, useful when some rooms have a low number of clients (so only a few Socket.IO servers are notified). * * Only public rooms (i.e. not related to a particular Socket ID) are taken in account, because: - * * - a lot of connected clients would mean a lot of subscription/unsubscription * - the Socket ID attribute is ephemeral * + * - "dynamic-private" + * + * Like "dynamic" but creates separate channels for private rooms as well. Useful when there is lots of 1:1 communication + * via socket.emit() calls. + * * @default "dynamic" */ - subscriptionMode?: "static" | "dynamic"; + subscriptionMode?: "static" | "dynamic" | "dynamic-private"; } /** @@ -89,17 +93,18 @@ class ShardedRedisAdapter extends ClusterAdapter { SSUBSCRIBE(this.subClient, this.channel, handler); SSUBSCRIBE(this.subClient, this.responseChannel, handler); - if (this.opts.subscriptionMode === "dynamic") { + if ( + this.opts.subscriptionMode === "dynamic" || + this.opts.subscriptionMode === "dynamic-private" + ) { this.on("create-room", (room) => { - const isPublicRoom = !this.sids.has(room); - if (isPublicRoom) { + if (this.shouldUseASeparateNamespace(room)) { SSUBSCRIBE(this.subClient, this.dynamicChannel(room), handler); } }); this.on("delete-room", (room) => { - const isPublicRoom = !this.sids.has(room); - if (isPublicRoom) { + if (this.shouldUseASeparateNamespace(room)) { SUNSUBSCRIBE(this.subClient, this.dynamicChannel(room)); } }); @@ -109,10 +114,12 @@ class ShardedRedisAdapter extends ClusterAdapter { override close(): Promise | void { const channels = [this.channel, this.responseChannel]; - if (this.opts.subscriptionMode === "dynamic") { + if ( + this.opts.subscriptionMode === "dynamic" || + this.opts.subscriptionMode === "dynamic-private" + ) { this.rooms.forEach((_sids, room) => { - const isPublicRoom = !this.sids.has(room); - if (isPublicRoom) { + if (this.shouldUseASeparateNamespace(room)) { channels.push(this.dynamicChannel(room)); } }); @@ -136,11 +143,13 @@ class ShardedRedisAdapter extends ClusterAdapter { // broadcast with ack can not use a dynamic channel, because the serverCount() method return the number of all // servers, not only the ones where the given room exists const useDynamicChannel = - this.opts.subscriptionMode === "dynamic" && message.type === MessageType.BROADCAST && message.data.requestId === undefined && message.data.opts.rooms.length === 1 && - !looksLikeASocketId(message.data.opts.rooms[0]); + ((this.opts.subscriptionMode === "dynamic" && + !looksLikeASocketId(message.data.opts.rooms[0])) || + this.opts.subscriptionMode === "dynamic-private"); + if (useDynamicChannel) { return this.dynamicChannel(message.data.opts.rooms[0]); } else { @@ -204,4 +213,13 @@ class ShardedRedisAdapter extends ClusterAdapter { override serverCount(): Promise { return PUBSUB(this.pubClient, "SHARDNUMSUB", this.channel); } + + private shouldUseASeparateNamespace(room: string): boolean { + const isPublicRoom = !this.sids.has(room); + + return ( + (this.opts.subscriptionMode === "dynamic" && isPublicRoom) || + this.opts.subscriptionMode === "dynamic-private" + ); + } } diff --git a/test/test-runner.ts b/test/test-runner.ts index e994c63..d80c1d5 100644 --- a/test/test-runner.ts +++ b/test/test-runner.ts @@ -175,6 +175,28 @@ describe("@socket.io/redis-adapter", () => { true )); + describe("[sharded] redis@4 standalone (dynamic subscription mode & dynamic private channels)", () => + testSuite( + async () => { + const pubClient = createClient(); + const subClient = pubClient.duplicate(); + + await Promise.all([pubClient.connect(), subClient.connect()]); + + return [ + createShardedAdapter(pubClient, subClient, { + subscriptionMode: "dynamic-private", + }), + () => { + pubClient.disconnect(); + subClient.disconnect(); + }, + ]; + }, + "redis@4", + true + )); + describe("[sharded] redis@4 standalone (static subscription mode)", () => testSuite( async () => {