Skip to content

Commit

Permalink
Merge pull request #3984 from dseurotech/fix-event_clients_unique
Browse files Browse the repository at this point in the history
:fix: Making sure the event listeners are unique
  • Loading branch information
Coduz authored Mar 7, 2024
2 parents af7e915 + 264661a commit 87947b0
Show file tree
Hide file tree
Showing 31 changed files with 173 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ String metricModuleName() {
return "broker-telemetry";
}

@Provides
@Named("eventsModuleName")
String eventModuleName() {
return "telemetry";
}

@Provides
@Singleton
@Named("brokerHost")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ private void fillEvent(MethodInvocation invocation, ServiceEvent serviceEvent) {
Class<?> wrappedClass = ((KapuaService) invocation.getThis()).getClass().getSuperclass(); // this object should be not null
Class<?>[] implementedClass = wrappedClass.getInterfaces();
// assuming that the KapuaService implemented is specified by the first implementing interface
String serviceInterfaceName = implementedClass[0].getName();
String serviceInterfaceName = implementedClass[0].getSimpleName();
// String splittedServiceInterfaceName[] = serviceInterfaceName.split("\\.");
// String serviceName = splittedServiceInterfaceName.length > 0 ? splittedServiceInterfaceName[splittedServiceInterfaceName.length-1] : "";
// String cleanedServiceName = serviceName.substring(0, serviceName.length()-"Service".length()).toLowerCase();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -57,13 +58,21 @@ public ServiceEventTransactionalModule(
String internalAddress,
ServiceEventHouseKeeperFactory serviceEventTransactionalHousekeeperFactory,
ServiceEventBus serviceEventBus) {
this.serviceEventClientConfigurations = serviceEventClientConfigurations;
this(serviceEventClientConfigurations, internalAddress, UUID.randomUUID().toString(), serviceEventTransactionalHousekeeperFactory, serviceEventBus);
}

public ServiceEventTransactionalModule(
ServiceEventClientConfiguration[] serviceEventClientConfigurations,
String internalAddress,
String uniqueClientId,
ServiceEventHouseKeeperFactory serviceEventTransactionalHousekeeperFactory,
ServiceEventBus serviceEventBus) {
this.serviceEventBus = serviceEventBus;
this.serviceEventClientConfigurations = appendClientId(uniqueClientId, serviceEventClientConfigurations);
this.internalAddress = internalAddress;
this.houseKeeperFactory = serviceEventTransactionalHousekeeperFactory;
this.serviceEventBus = serviceEventBus;
}


@Override
public void start() throws KapuaException {
LOGGER.info("Starting service event module... {}", this.getClass().getName());
Expand Down Expand Up @@ -148,7 +157,7 @@ protected ServiceEventClientConfiguration[] appendClientId(String clientId, Serv
return config;
} else {
// config for @ListenServiceEvent
String subscriberName = config.getClientName() + (clientId == null ? "" : "_" + clientId);
String subscriberName = config.getClientName() + (clientId == null ? "" : "-" + clientId);
LOGGER.debug("Adding config for @ListenServiceEvent - address: {}, name: {}, listener: {}", config.getAddress(), subscriberName, config.getEventListener());
return new ServiceEventClientConfiguration(config.getAddress(), subscriberName, config.getEventListener());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,28 @@
*******************************************************************************/
package org.eclipse.kapua.commons.event;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.commons.lang3.ArrayUtils;
import org.eclipse.kapua.KapuaException;
import org.eclipse.kapua.KapuaRuntimeException;
import org.eclipse.kapua.event.ListenServiceEvent;
import org.eclipse.kapua.event.RaiseServiceEvent;
import org.eclipse.kapua.event.ServiceEvent;
import org.eclipse.kapua.service.KapuaService;

import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class ServiceInspector {

private static final Logger LOGGER = LoggerFactory.getLogger(ServiceInspector.class);

private ServiceInspector() {}
private ServiceInspector() {
}

public static <T extends KapuaService> List<ServiceEventClientConfiguration> getEventBusClients(KapuaService aService, Class<T> clazz) {

Expand Down Expand Up @@ -83,26 +83,26 @@ public static <T extends KapuaService> List<ServiceEventClientConfiguration> get
KapuaRuntimeException.internalError(e1, String.format("Unable to get the annotated method: annotation %s", ListenServiceEvent.class));
}

for (ListenServiceEvent listenAnnotation:listenAnnotations) {
for (ListenServiceEvent listenAnnotation : listenAnnotations) {
final Method listenerMethod = enhancedMethod;
configurations.add(
new ServiceEventClientConfiguration(
listenAnnotation.fromAddress(),
clazz.getName(),
serviceEvent -> {
try {
listenerMethod.invoke(aService, serviceEvent);
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
throw KapuaException.internalError(e, String.format("Error invoking method %s", method.getName()));
}
}));
new ServiceEventClientConfiguration(
listenAnnotation.fromAddress(),
clazz.getSimpleName(),
serviceEvent -> {
try {
listenerMethod.invoke(aService, serviceEvent);
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
throw KapuaException.internalError(e, String.format("Error invoking method %s", method.getName()));
}
}));
}
}
if (!ArrayUtils.isEmpty(raiseAnnotations)) {
configurations.add(
new ServiceEventClientConfiguration(
null,
clazz.getName(),
clazz.getSimpleName(),
null));
}
}
Expand All @@ -122,7 +122,7 @@ private static Method getMatchingMethod(Class<?> clazz, Method method) throws No
if (!candidate.getReturnType().equals(method.getReturnType())) {
continue;
}
if(!Arrays.equals(method.getParameterTypes(), candidate.getParameterTypes())) {
if (!Arrays.equals(method.getParameterTypes(), candidate.getParameterTypes())) {
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,9 @@ synchronized void subscribe(Subscription subscription)
String subscriptionStr = String.format("$SYS/EVT/%s", subscription.getAddress());
// create a bunch of sessions to allow parallel event processing
LOGGER.info("Subscribing to address {} - name {} ...", subscriptionStr, subscription.getName());
final Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic jmsTopic = jmsSession.createTopic(subscriptionStr);
for (int i = 0; i < consumerPoolSize; i++) {
final Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic jmsTopic = jmsSession.createTopic(subscriptionStr);
MessageConsumer jmsConsumer = jmsSession.createSharedDurableConsumer(jmsTopic, subscription.getName());
jmsConsumer.setMessageListener(message -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ commons.eventbus.producerPool.minSize=5
commons.eventbus.producerPool.maxSize=5
commons.eventbus.producerPool.maxWaitOnBorrow=100
commons.eventbus.producerPool.evictionInterval=600000
commons.eventbus.consumerPool.size=10
commons.eventbus.consumerPool.size=2
commons.eventbus.messageSerializer=org.eclipse.kapua.commons.event.XmlServiceEventMarshaler
commons.eventbus.transport.useEpoll=true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,10 @@ protected void configureModule() {
String metricModuleName() {
return "web-console";
}

@Provides
@Named("eventsModuleName")
String eventModuleName() {
return "console";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,11 @@ protected void configureModule() {
String metricModuleName() {
return MetricsLifecycle.CONSUMER_LIFECYCLE;
}

@Provides
@Named("eventsModuleName")
String eventModuleName() {
return "lifecycle";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,11 @@ protected void configureModule() {
String metricModuleName() {
return MetricsTelemetry.CONSUMER_TELEMETRY;
}

@Provides
@Named("eventsModuleName")
String eventModuleName() {
return "telemetry";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,11 @@ protected void configureModule() {
String metricModuleName() {
return "job-engine";
}

@Provides
@Named("eventsModuleName")
String eventModuleName() {
return "job_engine";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.inject.matcher.AbstractMatcher;
import com.google.inject.matcher.Matcher;
import com.google.inject.matcher.Matchers;
import com.google.inject.multibindings.Multibinder;
import com.google.inject.multibindings.ProvidesIntoSet;
import com.google.inject.spi.InjectionListener;
import com.google.inject.spi.TypeEncounter;
Expand Down Expand Up @@ -63,8 +62,6 @@ public class KapuaModule extends AbstractKapuaModule {

private final String resourceName;

private Multibinder<ServiceModule> serviceModulesBindings;

public KapuaModule(final String resourceName) {
this.resourceName = resourceName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ String metricModuleName() {
return "test";
}

@Provides
@Named(value = "eventsModuleName")
String eventsModuleName() {
return "test";
}

@Provides
@Singleton
ServiceEventBusDriver serviceEventBusDriver() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@ protected void configureModule() {

}


@Provides
@Named("metricModuleName")
String metricModuleName() {
return "qa-tests";
}

@Provides
@Named("eventsModuleName")
String eventModuleName() {
return "qa_tests";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ commons.eventbus.producerPool.minSize=5
commons.eventbus.producerPool.maxSize=5
commons.eventbus.producerPool.maxWaitOnBorrow=100
commons.eventbus.producerPool.evictionInterval=600000
commons.eventbus.consumerPool.size=10
commons.eventbus.consumerPool.size=2
commons.eventbus.messageSerializer=org.eclipse.kapua.commons.event.XmlServiceEventMarshaler
commons.eventbus.transport.useEpoll=true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,11 @@ Boolean showStackTrace(KapuaApiCoreSetting kapuaApiCoreSetting) {
String metricModuleName() {
return "unit-tests";
}

@Provides
@Named("eventsModuleName")
String eventModuleName() {
return "unit_tests";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,11 @@ Boolean showStackTrace(KapuaApiCoreSetting kapuaApiCoreSetting) {
String metricModuleName() {
return "rest-api";
}

@Provides
@Named("eventsModuleName")
String eventModuleName() {
return "rest_api";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ ServiceModule accountServiceModule(AccountService accountService,
EventStoreFactory eventStoreFactory,
EventStoreRecordRepository eventStoreRecordRepository,
ServiceEventBus serviceEventBus,
KapuaAccountSetting kapuaAccountSetting
KapuaAccountSetting kapuaAccountSetting,
@Named("eventsModuleName") String eventModuleName
) throws ServiceEventBusException {
return new AccountServiceModule(
accountService,
Expand All @@ -103,7 +104,8 @@ ServiceModule accountServiceModule(AccountService accountService,
txManagerFactory.create("kapua-account"),
serviceEventBus
),
serviceEventBus);
serviceEventBus,
eventModuleName);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.eclipse.kapua.service.account.internal.setting.KapuaAccountSetting;
import org.eclipse.kapua.service.account.internal.setting.KapuaAccountSettingKeys;

import java.util.UUID;

/**
* {@link AccountService} {@link ServiceModule} implementation.
*
Expand All @@ -33,10 +35,12 @@ public AccountServiceModule(
AccountService accountService,
KapuaAccountSetting kapuaAccountSetting,
ServiceEventHouseKeeperFactory serviceEventHouseKeeperFactory,
ServiceEventBus serviceEventBus) {
ServiceEventBus serviceEventBus,
String eventModuleName) {
super(
ServiceInspector.getEventBusClients(accountService, AccountService.class).toArray(new ServiceEventClientConfiguration[0]),
kapuaAccountSetting.getString(KapuaAccountSettingKeys.ACCOUNT_EVENT_ADDRESS),
eventModuleName + "-" + UUID.randomUUID().toString(),
serviceEventHouseKeeperFactory,
serviceEventBus);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@ protected void configureModule() {

}


@Provides
@Named("metricModuleName")
String metricModuleName() {
return "qa-tests";
}

@Provides
@Named("eventsModuleName")
String eventModuleName() {
return "qa_tests";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,11 @@ protected void configureModule() {
String metricModuleName() {
return MetricsAuthentication.SERVICE_AUTHENTICATION;
}

@Provides
@Named("eventsModuleName")
String eventModuleName() {
return "authentication";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ protected ServiceModule deviceConnectionEventListenerServiceModule(DeviceConnect
@Named("DeviceRegistryTransactionManager") TxManager txManager,
EventStoreFactory eventStoreFactory,
EventStoreRecordRepository eventStoreRecordRepository,
ServiceEventBus serviceEventBus
ServiceEventBus serviceEventBus,
@Named("eventsModuleName") String eventModuleName
) throws ServiceEventBusException {

String address = kapuaDeviceRegistrySettings.getString(KapuaDeviceRegistrySettingKeys.DEVICE_EVENT_ADDRESS);
Expand All @@ -69,6 +70,7 @@ protected ServiceModule deviceConnectionEventListenerServiceModule(DeviceConnect
txManager,
serviceEventBus
),
serviceEventBus);
serviceEventBus,
eventModuleName);
}
}
Loading

0 comments on commit 87947b0

Please sign in to comment.