Skip to content

Commit

Permalink
Fix Pulsar tests
Browse files Browse the repository at this point in the history
  • Loading branch information
geomagilles committed Sep 25, 2024
1 parent 13dd401 commit 48b044a
Show file tree
Hide file tree
Showing 17 changed files with 107 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class InfiniticClient(
"Some ongoing messages may not have been sent properly."
}
} finally {
deleteClientTopics()
deleteClientTopic()
config.transport.close()
}
}
Expand All @@ -113,10 +113,10 @@ class InfiniticClient(
}

/**
* Deletes the topics associated with the client
* Deletes the topic associated with the client
* (Do NOT delete the client DLQ topic to allow manual inspection of failed messages)
*/
private suspend fun deleteClientTopics() {
private suspend fun deleteClientTopic() {
if (::consumer.isLazyInitialized) {
val name = getName()
resources.deleteTopicForClient(name).getOrElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@
*
* Licensor: infinitic.io
*/
package io.infinitic.transport.consumers
package io.infinitic.common.transport.consumers

import io.infinitic.common.data.MillisInstant
import io.infinitic.common.fixtures.runAndCancel
import io.infinitic.common.transport.MessageBatchConfig
import io.infinitic.common.transport.consumers.ConsumerSharedProcessor
import io.kotest.assertions.throwables.shouldThrow
import io.kotest.core.spec.style.StringSpec
import io.kotest.matchers.shouldBe
Expand Down Expand Up @@ -226,7 +225,7 @@ internal class ConsumerBatchedProcessorTests : StringSpec(
runAndCancel {
processorWithError.start(concurrency)
}

checkAllProcessedAreAcknowledged()
checkBeforeNegativeAcknowledged()
negativeAcknowledgedList shouldBe emptySet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
*
* Licensor: infinitic.io
*/
package io.infinitic.transport.consumers
package io.infinitic.common.transport.consumers

import io.infinitic.common.data.MillisInstant
import io.infinitic.common.fixtures.runAndCancel
import io.infinitic.common.transport.consumers.ConsumerSharedProcessor
import io.kotest.assertions.throwables.shouldThrow
import io.kotest.core.spec.style.StringSpec
import io.kotest.matchers.shouldBe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
*
* Licensor: infinitic.io
*/
package io.infinitic.transport.consumers
package io.infinitic.common.transport.consumers

import io.infinitic.common.data.MillisInstant
import io.infinitic.common.fixtures.runAndCancel
import io.infinitic.common.transport.consumers.ConsumerUniqueProcessor
import io.kotest.assertions.throwables.shouldThrow
import io.kotest.core.spec.style.StringSpec
import io.kotest.matchers.shouldBe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
*
* Licensor: infinitic.io
*/
package io.infinitic.transport.consumers
package io.infinitic.common.transport.consumers

import io.infinitic.common.data.MillisInstant
import io.infinitic.common.transport.MessageBatchConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class DockerOnly : EnabledCondition {

val pulsarServer by lazy {
when (shouldRun) {
true -> PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:$pulsarVersion")).also { it.start() }
true -> PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:$pulsarVersion"))
false -> null
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,21 @@ fun later(delay: Long = 100L, f: suspend CoroutineScope.() -> Unit) = CoroutineS
f()
}

suspend fun runAndCancel(block: suspend CoroutineScope.() -> Unit): CancellationException {
suspend fun runWithContextAndCancel(block: suspend context(CoroutineScope) () -> Unit) {
val scope = CoroutineScope(Dispatchers.IO)

later { scope.cancel() }

return block(scope)
}

suspend fun runAndCancel(block: suspend () -> Unit): CancellationException {
val scope = CoroutineScope(Dispatchers.IO)

later { scope.cancel() }

return shouldThrow<CancellationException> {
withContext(scope.coroutineContext) {
block()
}
withContext(scope.coroutineContext) { block() }
}
}

Expand Down
5 changes: 4 additions & 1 deletion infinitic-tests/src/test/kotlin/io/infinitic/Test.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ package io.infinitic
import io.infinitic.common.fixtures.DockerOnly
import io.infinitic.common.workflows.data.workflows.WorkflowId
import io.infinitic.common.workflows.engine.state.WorkflowState
import io.infinitic.pulsar.admin.InfiniticPulsarAdmin
import io.infinitic.pulsar.client.InfiniticPulsarClient
import io.infinitic.transport.config.InMemoryTransportConfig
import io.infinitic.transport.config.PulsarTransportConfig
import io.infinitic.utils.Listener
Expand Down Expand Up @@ -71,6 +73,8 @@ internal object Test {
val client by lazy { worker.client }

fun start() {
InfiniticPulsarClient.clearCaches()
InfiniticPulsarAdmin.clearCaches()
pulsarServer?.start()
worker.startAsync()
}
Expand All @@ -81,7 +85,6 @@ internal object Test {
}
}


/**
* This listener is used to close resources after all tests
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,14 @@ class InfiniticPulsarAdmin(
companion object {
private const val DEFAULT_NUM_PARTITIONS = 3

@JvmStatic
fun clearCaches() {
initializedTenants.clear()
initializedNamespaces.clear()
initializedTopics.clear()
subscriptionCheckedForConsumers.clear()
}

// set of initialized tenants
private val initializedTenants = mutableMapOf<String, Result<TenantInfo>>()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,21 +84,6 @@ class InfiniticPulsarClient(private val pulsarClient: PulsarClient) {
}
}

/**
* Closes a consumer and removes it from the list of consumers.
*
* @param consumer The consumer to close.
* @return Result of the close operation.
* - Result.success(Unit) if the consumer was closed successfully.
* - Result.failure(e) if an error occurred during the close operation.
*/
fun closeConsumer(consumer: Consumer<*>): Result<Unit> = try {
consumer.close()
Result.success(Unit)
} catch (e: PulsarClientException) {
Result.failure(e)
}

/**
* Get existing producer or create a new one
*
Expand Down Expand Up @@ -403,5 +388,10 @@ class InfiniticPulsarClient(private val pulsarClient: PulsarClient) {
companion object {
// producer per topic
val producers = ConcurrentHashMap<String, Producer<Envelope<out Message>>>()

@JvmStatic
fun clearCaches() {
producers.clear()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
package io.infinitic.pulsar

import io.infinitic.common.clients.data.ClientName
import io.infinitic.common.fixtures.runAndCancel
import io.infinitic.common.fixtures.runWithContextAndCancel
import io.infinitic.common.messages.Envelope
import io.infinitic.common.messages.Message
import io.infinitic.common.tasks.data.ServiceName
Expand Down Expand Up @@ -54,6 +54,7 @@ import io.mockk.spyk
import net.bytebuddy.utility.RandomString
import org.apache.pulsar.client.api.Consumer
import org.apache.pulsar.client.api.Schema
import java.util.concurrent.CompletableFuture

class PulsarInfiniticConsumerTests : StringSpec(
{
Expand All @@ -74,26 +75,28 @@ class PulsarInfiniticConsumerTests : StringSpec(
policies = PoliciesConfig(),
)

val original = PulsarResources(pulsarConfig)

val pulsarResources = spyk(original) {
val pulsarResources = spyk(PulsarResources(pulsarConfig)) {
coEvery { initTopicOnce(any(), any(), any()) } returns Result.success(Unit)
coEvery { initDlqTopicOnce(any(), any(), any()) } returns Result.success(Unit)
}

val pulsarConsumer = mockk<Consumer<Envelope<out Message>>> {
every { receiveAsync() } returns CompletableFuture<org.apache.pulsar.client.api.Message<Envelope<out Message>>>()
}

val client = mockk<InfiniticPulsarClient> {
every { newConsumer(any<Schema<Envelope<out Message>>>(), any(), any()) } returns
Result.success(mockk<Consumer<Envelope<out Message>>>())
Result.success(pulsarConsumer)
}

val infiniticConsumerAsync =
val infiniticConsumer =
PulsarInfiniticConsumer(client, pulsarConfig.consumer, pulsarResources)

"should init client-response topic before consuming it" {
val name = "$clientName"

runAndCancel {
infiniticConsumerAsync.start(
runWithContextAndCancel {
infiniticConsumer.start(
subscription = MainSubscription(ClientTopic),
entity = name,
handler = { _, _ -> },
Expand All @@ -102,6 +105,7 @@ class PulsarInfiniticConsumerTests : StringSpec(
)
}


coVerify {
pulsarResources.initTopicOnce(
"persistent://$tenant/$namespace/response:$name",
Expand All @@ -114,8 +118,8 @@ class PulsarInfiniticConsumerTests : StringSpec(
"should init workflow-tag topic before consuming it" {
val name = "$workflowName"

runAndCancel {
infiniticConsumerAsync.start(
runWithContextAndCancel {
infiniticConsumer.start(
subscription = MainSubscription(WorkflowTagEngineTopic),
entity = name,
handler = { _, _ -> },
Expand All @@ -136,8 +140,8 @@ class PulsarInfiniticConsumerTests : StringSpec(
"should init workflow-cmd topic before consuming it" {
val name = "$workflowName"

runAndCancel {
infiniticConsumerAsync.start(
runWithContextAndCancel {
infiniticConsumer.start(
subscription = MainSubscription(WorkflowStateCmdTopic),
entity = name,
handler = { _, _ -> },
Expand All @@ -158,8 +162,8 @@ class PulsarInfiniticConsumerTests : StringSpec(
"should init workflow-engine topic before consuming it" {
val name = "$workflowName"

runAndCancel {
infiniticConsumerAsync.start(
runWithContextAndCancel {
infiniticConsumer.start(
subscription = MainSubscription(WorkflowStateEngineTopic),
entity = name,
handler = { _, _ -> },
Expand All @@ -180,8 +184,8 @@ class PulsarInfiniticConsumerTests : StringSpec(
"should init workflow-delay topic before consuming it" {
val name = "$workflowName"

runAndCancel {
infiniticConsumerAsync.start(
runWithContextAndCancel {
infiniticConsumer.start(
subscription = MainSubscription(WorkflowStateTimerTopic),
entity = name,
handler = { _, _ -> },
Expand All @@ -202,8 +206,8 @@ class PulsarInfiniticConsumerTests : StringSpec(
"should init workflow-events topic before consuming it" {
val name = "$workflowName"

runAndCancel {
infiniticConsumerAsync.start(
runWithContextAndCancel {
infiniticConsumer.start(
subscription = MainSubscription(WorkflowStateEventTopic),
entity = name,
handler = { _, _ -> },
Expand All @@ -224,8 +228,8 @@ class PulsarInfiniticConsumerTests : StringSpec(
"should init workflow-task-executor topic before consuming it" {
val name = "$workflowName"

runAndCancel {
infiniticConsumerAsync.start(
runWithContextAndCancel {
infiniticConsumer.start(
subscription = MainSubscription(WorkflowExecutorTopic),
entity = name,
handler = { _, _ -> },
Expand All @@ -246,8 +250,8 @@ class PulsarInfiniticConsumerTests : StringSpec(
"should init workflow-task-events topic before consuming it" {
val name = "$workflowName"

runAndCancel {
infiniticConsumerAsync.start(
runWithContextAndCancel {
infiniticConsumer.start(
subscription = MainSubscription(WorkflowExecutorEventTopic),
entity = name,
handler = { _, _ -> },
Expand All @@ -268,8 +272,8 @@ class PulsarInfiniticConsumerTests : StringSpec(
"should init task-tag topic before consuming it" {
val name = "$serviceName"

runAndCancel {
infiniticConsumerAsync.start(
runWithContextAndCancel {
infiniticConsumer.start(
subscription = MainSubscription(ServiceTagEngineTopic),
entity = name,
handler = { _, _ -> },
Expand All @@ -290,8 +294,8 @@ class PulsarInfiniticConsumerTests : StringSpec(
"should init task-executor topic before consuming it" {
val name = "$serviceName"

runAndCancel {
infiniticConsumerAsync.start(
runWithContextAndCancel {
infiniticConsumer.start(
subscription = MainSubscription(ServiceExecutorTopic),
entity = name,
handler = { _, _ -> },
Expand All @@ -312,8 +316,8 @@ class PulsarInfiniticConsumerTests : StringSpec(
"should init task-events topic before consuming it" {
val name = "$serviceName"

runAndCancel {
infiniticConsumerAsync.start(
runWithContextAndCancel {
infiniticConsumer.start(
subscription = MainSubscription(ServiceExecutorEventTopic),
entity = name,
handler = { _, _ -> },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ import io.infinitic.common.workflows.engine.messages.WorkflowStateEngineCmdMessa
import io.infinitic.common.workflows.engine.messages.WorkflowStateEngineEventMessage
import io.infinitic.common.workflows.engine.messages.WorkflowStateEngineMessage
import io.infinitic.common.workflows.tags.messages.WorkflowTagEngineMessage
import io.infinitic.pulsar.admin.InfiniticPulsarAdmin
import io.infinitic.pulsar.client.InfiniticPulsarClient
import io.infinitic.pulsar.config.policies.PoliciesConfig
import io.infinitic.pulsar.config.pulsarConfigTest
import io.kotest.assertions.throwables.shouldNotThrowAny
Expand All @@ -61,7 +63,6 @@ import io.kotest.matchers.shouldNotBe
class PulsarInfiniticProducerTests : StringSpec(
{
val pulsarConfig = pulsarConfigTest!!

val admin = pulsarConfig.infiniticPulsarAdmin
val tenant = pulsarConfig.tenant
val namespace = pulsarConfig.namespace
Expand All @@ -73,6 +74,17 @@ class PulsarInfiniticProducerTests : StringSpec(
pulsarResources,
)

beforeEach {
InfiniticPulsarClient.clearCaches()
InfiniticPulsarAdmin.clearCaches()
}

afterSpec {
pulsarConfig.pulsarClient.close()
pulsarConfig.pulsarAdmin.close()
DockerOnly().pulsarServer?.stop()
}

"publishing to an absent ClientTopic should not throw, should NOT create the topic" {
val message = TestFactory.random<ClientMessage>()

Expand Down
Loading

0 comments on commit 48b044a

Please sign in to comment.