Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

:fix: improve event bus observability #4005

Merged
merged 2 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,10 @@
@Singleton
public class MetricsSecurityPlugin {

private static final String CONNECTION = "connection";
private static final String SESSION = "session";
private static final String ACL = "acl";
private static final String BROKER_CONNECTION = "broker_connection";
private static final String SESSION_CONTEXT = "session_context";
private static final String SESSION_CONTEXT_BY_CLIENT = "session_context_by_client";
private static final String ACTIVE_CONNECTION = "active_connection";
private static final String DISK_USAGE = "disk_usage";
private static final String TOTAL_CONNECTION = "total_connection";
private static final String TOTAL_MESSAGE = "total_message";
private static final String TOTAL_MESSAGE_ACKNOWLEDGED = "total_message_acknowledged";
private static final String TOTAL_MESSAGE_ADDED = "total_message_added";
Expand All @@ -57,12 +52,6 @@ public void init(ActiveMQServer server, Gauge<Integer> mapSize, Gauge<Integer> m
metricsService.registerGauge(aclSize, metricModuleName, LoginMetric.COMPONENT_LOGIN, ACL);
metricsService.registerGauge(activeConnection, metricModuleName, LoginMetric.COMPONENT_LOGIN, ACTIVE_CONNECTION);

metricsService.registerGauge(() -> server.getSessions().size(), metricModuleName, LoginMetric.COMPONENT_LOGIN, SESSION);
metricsService.registerGauge(() -> server.getConnectionCount(), metricModuleName, LoginMetric.COMPONENT_LOGIN, CONNECTION);
metricsService.registerGauge(() -> server.getBrokerConnections().size(), metricModuleName, LoginMetric.COMPONENT_LOGIN, BROKER_CONNECTION);
//from broker
metricsService.registerGauge(() -> server.getDiskStoreUsage(), metricModuleName, LoginMetric.COMPONENT_LOGIN, DISK_USAGE, MetricsLabel.SIZE);
metricsService.registerGauge(() -> server.getTotalConnectionCount(), metricModuleName, LoginMetric.COMPONENT_LOGIN, TOTAL_CONNECTION, MetricsLabel.SIZE);
metricsService.registerGauge(() -> server.getTotalMessageCount(), metricModuleName, LoginMetric.COMPONENT_LOGIN, TOTAL_MESSAGE, MetricsLabel.SIZE);
metricsService.registerGauge(() -> server.getTotalMessagesAcknowledged(), metricModuleName, LoginMetric.COMPONENT_LOGIN, TOTAL_MESSAGE_ACKNOWLEDGED, MetricsLabel.SIZE);
metricsService.registerGauge(() -> server.getTotalMessagesAdded(), metricModuleName, LoginMetric.COMPONENT_LOGIN, TOTAL_MESSAGE_ADDED, MetricsLabel.SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ public interface ServiceEventBusDriver {
public void stop() throws ServiceEventBusException;

public ServiceEventBus getEventBus();

public Boolean isConnected();
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@
KapuaSession.createFrom(kapuaEvent.getScopeId(), kapuaEvent.getUserId());
}

public Boolean isConnected() {
return eventBusJMSConnectionBridge.isConnected();

Check warning on line 140 in commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java

View check run for this annotation

Codecov / codecov/patch

commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java#L140

Added line #L140 was not covered by tests
}

/**
* Stop the event bus
*
Expand Down Expand Up @@ -209,9 +213,15 @@
private Connection jmsConnection;
private Map<String, SenderPool> senders = new HashMap<>();
private ExceptionListenerImpl exceptionListener;
private Boolean connected;

public EventBusJMSConnectionBridge() {
this.exceptionListener = new ExceptionListenerImpl();
connected = Boolean.FALSE;
}

Boolean isConnected() {
return connected;

Check warning on line 224 in commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java

View check run for this annotation

Codecov / codecov/patch

commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java#L224

Added line #L224 was not covered by tests
}

void start() throws JMSException, NamingException, ServiceEventBusException {
Expand All @@ -230,6 +240,7 @@

jmsConnection = jmsConnectionFactory.createConnection(eventbusUsername, eventbusPassword);
jmsConnection.setExceptionListener(exceptionListener);
connected = Boolean.TRUE;

Check warning on line 243 in commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java

View check run for this annotation

Codecov / codecov/patch

commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java#L243

Added line #L243 was not covered by tests
jmsConnection.start();
}

Expand All @@ -239,6 +250,7 @@
}

private void closeConnection() {
connected = Boolean.FALSE;
try {
if (jmsConnection != null) {
exceptionListener.stop();
Expand Down Expand Up @@ -436,6 +448,7 @@

@Override
public void onException(JMSException e) {
connected = Boolean.FALSE;

Check warning on line 451 in commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java

View check run for this annotation

Codecov / codecov/patch

commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java#L451

Added line #L451 was not covered by tests
LOGGER.error("EventBus Listener {} - Connection thrown exception: {}", this, e.getMessage(), e);
commonsMetric.getEventBusConnectionError().inc();
int i = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,22 @@ protected void configureModule() {
@Singleton
ServiceEventBusDriver serviceEventBusDriver() {
return new ServiceEventBusDriver() {

private Boolean connected = Boolean.FALSE;

@Override
public String getType() {
return "test";
}

@Override
public void start() throws ServiceEventBusException {
//Nothing to do!
connected = Boolean.TRUE;
}

@Override
public void stop() throws ServiceEventBusException {
//Nothing to do!
connected = Boolean.FALSE;
}

@Override
Expand All @@ -61,6 +64,11 @@ public void subscribe(String address, String name, ServiceEventBusListener event
}
};
}

@Override
public Boolean isConnected() {
return connected;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,22 @@ String eventsModuleName() {
@Singleton
ServiceEventBusDriver serviceEventBusDriver() {
return new ServiceEventBusDriver() {

private Boolean connected = Boolean.FALSE;

@Override
public String getType() {
return "test";
}

@Override
public void start() throws ServiceEventBusException {
//Nothing to do!
connected = Boolean.TRUE;
}

@Override
public void stop() throws ServiceEventBusException {
//Nothing to do!
connected = Boolean.FALSE;
}

@Override
Expand All @@ -76,6 +79,11 @@ public void subscribe(String address, String name, ServiceEventBusListener event
}
};
}

@Override
public Boolean isConnected() {
return connected;
}
};
}
}
Loading