From 0887f89d94516a3da46ec47e5dfe85453a0ba59c Mon Sep 17 00:00:00 2001 From: "marcin.cebo" Date: Tue, 28 Jan 2025 13:18:23 +0100 Subject: [PATCH] Added multiTreading test for subscribe. --- .../integration/SubscribeIntegrationTests.kt | 96 +++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/pubnub-kotlin/pubnub-kotlin-impl/src/integrationTest/kotlin/com/pubnub/api/integration/SubscribeIntegrationTests.kt b/pubnub-kotlin/pubnub-kotlin-impl/src/integrationTest/kotlin/com/pubnub/api/integration/SubscribeIntegrationTests.kt index 48a3c81be..07b8ba1a3 100644 --- a/pubnub-kotlin/pubnub-kotlin-impl/src/integrationTest/kotlin/com/pubnub/api/integration/SubscribeIntegrationTests.kt +++ b/pubnub-kotlin/pubnub-kotlin-impl/src/integrationTest/kotlin/com/pubnub/api/integration/SubscribeIntegrationTests.kt @@ -24,6 +24,7 @@ import okhttp3.logging.HttpLoggingInterceptor import org.junit.Assert.assertEquals import org.junit.Assert.assertNotNull import org.junit.Assert.assertTrue +import org.junit.Ignore import org.junit.Test import org.junit.jupiter.api.Timeout import java.util.concurrent.CountDownLatch @@ -804,6 +805,84 @@ class SubscribeIntegrationTests : BaseIntegrationTest() { success.listen() } + @Ignore // For manual use only. It takes long time to run. + @Test + fun useAllEffectiveThreadsAndForEachOneSubscribeToManyChannels() { + val threads = mutableListOf() + + val channelsList00 = ('a'..'z').map { "0$it" } + val channelsList01 = ('a'..'z').map { "1$it" } + val channelsList02 = ('a'..'z').map { "2$it" } + val channelsList03 = ('a'..'z').map { "3$it" } + val channelsList04 = ('a'..'z').map { "4$it" } + val channelsList05 = ('a'..'z').map { "5$it" } + val channelsList06 = ('a'..'z').map { "6$it" } + val channelsList07 = ('a'..'z').map { "7$it" } + val channelsList08 = ('a'..'z').map { "8$it" } + val channelsList09 = ('a'..'z').map { "9$it" } + + val thread00 = createThread(channelsList00) + val thread01 = createThread(channelsList01) + val thread02 = createThread(channelsList02) + val thread03 = createThread(channelsList03) + val thread04 = createThread(channelsList04) + val thread05 = createThread(channelsList05) + val thread06 = createThread(channelsList06) + val thread07 = createThread(channelsList07) + val thread08 = createThread(channelsList08) + val thread09 = createThread(channelsList09) + threads.add(thread00) + threads.add(thread01) + threads.add(thread02) + threads.add(thread03) + threads.add(thread04) + threads.add(thread05) + threads.add(thread06) + threads.add(thread07) + threads.add(thread08) + threads.add(thread09) + + threads.forEach { thread -> thread.start() } + + publishToChannels(channelsList00) + publishToChannels(channelsList01) + publishToChannels(channelsList02) + publishToChannels(channelsList03) + publishToChannels(channelsList04) + publishToChannels(channelsList05) + publishToChannels(channelsList06) + publishToChannels(channelsList07) + publishToChannels(channelsList08) + publishToChannels(channelsList09) + + threads.forEach { thread -> thread.join() } + + Thread.sleep(3000) + + publishToChannels(channelsList00) + publishToChannels(channelsList01) + publishToChannels(channelsList02) + publishToChannels(channelsList03) + publishToChannels(channelsList04) + publishToChannels(channelsList05) + publishToChannels(channelsList06) + publishToChannels(channelsList07) + publishToChannels(channelsList08) + publishToChannels(channelsList09) + + Thread.sleep(3000) + + val allChannelsLists = listOf( + channelsList00, channelsList01, channelsList02, channelsList03, + channelsList04, channelsList05, channelsList06, channelsList07, + channelsList08, channelsList09 + ) + + val channelsFromAllThreads = allChannelsLists.flatten() + + assertEquals(pubnub.getSubscribedChannels().toSet(), channelsFromAllThreads.toSet()) + } + @Test fun testAssigningEventBehaviourToSubscriptionSet() { val successMessage = AtomicInteger(0) @@ -892,4 +971,21 @@ class SubscribeIntegrationTests : BaseIntegrationTest() { subscriptionSet -= subscription01 assertEquals(2, subscriptionSet.subscriptions.size) } + + private fun publishToChannels(channelsList: List) { + channelsList.forEach { channelName -> + pubnub.publish(channelName, "-=message to $channelName").sync() + } + } + + private fun createThread(channelsList: List) = Thread { + channelsList.forEach { channelName -> + val chan01 = pubnub.channel(channelName) + val sub01 = chan01.subscription() + sub01.onMessage = { pnMessageResult: PNMessageResult -> + println("-=message: " + pnMessageResult.message + " channel: " + pnMessageResult.channel + " channelName: " + channelName) + } + sub01.subscribe() + } + } }