diff --git a/src/main/java/io/apicurio/registry/probe/kafka/ConsumedCustomersResource.java b/src/main/java/io/apicurio/registry/probe/kafka/ConsumedCustomersResource.java index 7abfd99..a7c3ea5 100644 --- a/src/main/java/io/apicurio/registry/probe/kafka/ConsumedCustomersResource.java +++ b/src/main/java/io/apicurio/registry/probe/kafka/ConsumedCustomersResource.java @@ -2,19 +2,18 @@ import io.apicurio.registry.probe.persistence.CustomerEntity; import io.apicurio.registry.probe.smoke.ProbeMonitoring; +import io.quarkus.runtime.StartupEvent; import io.smallrye.mutiny.Multi; import jakarta.enterprise.context.ApplicationScoped; -import jakarta.ws.rs.GET; -import jakarta.ws.rs.Path; -import jakarta.ws.rs.Produces; -import jakarta.ws.rs.core.MediaType; +import jakarta.enterprise.event.Observes; import org.eclipse.microprofile.reactive.messaging.Channel; -import org.jboss.resteasy.reactive.RestSseElementType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + @ApplicationScoped -@Path("/consumed-customers") public class ConsumedCustomersResource { private static final Logger log = LoggerFactory.getLogger(ProbeMonitoring.class); @@ -22,13 +21,30 @@ public class ConsumedCustomersResource { @Channel("customers-from-kafka") Multi customers; - @GET - @Produces(MediaType.SERVER_SENT_EVENTS) - @RestSseElementType(MediaType.TEXT_PLAIN) - public Multi stream() { - return customers.map(customer -> { + public void startMonitoring(@Observes StartupEvent startupEvent) { + int concurrentTasks = 2; + + try { + concurrentTasks = Integer.parseInt(System.getenv("CONCURRENT_TASKS")); + } catch (Exception e) { + log.warn("Cannot load concurrent tasks environment variable", e); + } + + ExecutorService e = Executors.newFixedThreadPool(concurrentTasks); + + for (int i = 0; i < concurrentTasks; i++) { + e.submit(() -> { + log.info("Removing customers..."); + removeCustomers(); + }); + } + } + + public void removeCustomers() { + customers.map(customer -> { final String stringToReturn = String.format("'%s' from %s", customer.getFirstName(), customer.getEmail()); try { + log.info("Deleting customer with email: {}", customer.getEmail()); customer.delete(); } catch (Exception e) { log.error("Exception detected in the Probe application: {}", e.getCause(), e); diff --git a/src/main/java/io/apicurio/registry/probe/monitoring/LoadGenerator.java b/src/main/java/io/apicurio/registry/probe/monitoring/LoadGenerator.java index 62af5d6..e395429 100644 --- a/src/main/java/io/apicurio/registry/probe/monitoring/LoadGenerator.java +++ b/src/main/java/io/apicurio/registry/probe/monitoring/LoadGenerator.java @@ -2,7 +2,6 @@ import io.apicurio.registry.probe.persistence.CustomerEntity; import io.apicurio.registry.probe.smoke.ProbeMonitoring; -import io.apicurio.registry.rest.client.RegistryClient; import io.quarkus.runtime.StartupEvent; import jakarta.enterprise.event.Observes; import jakarta.transaction.Transactional; @@ -18,8 +17,6 @@ public class LoadGenerator { private static final Logger log = LoggerFactory.getLogger(ProbeMonitoring.class); public void startMonitoring(@Observes StartupEvent startupEvent) { - - int concurrentTasks = 2; try {