Skip to content

Commit

Permalink
Merge pull request #4005 from eclipse/fix-event_bus_improvement
Browse files Browse the repository at this point in the history
:fix: improve event bus observability
  • Loading branch information
Coduz authored Apr 2, 2024
2 parents ab4b252 + 7d79862 commit 3025da1
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,10 @@
@Singleton
public class MetricsSecurityPlugin {

private static final String CONNECTION = "connection";
private static final String SESSION = "session";
private static final String ACL = "acl";
private static final String BROKER_CONNECTION = "broker_connection";
private static final String SESSION_CONTEXT = "session_context";
private static final String SESSION_CONTEXT_BY_CLIENT = "session_context_by_client";
private static final String ACTIVE_CONNECTION = "active_connection";
private static final String DISK_USAGE = "disk_usage";
private static final String TOTAL_CONNECTION = "total_connection";
private static final String TOTAL_MESSAGE = "total_message";
private static final String TOTAL_MESSAGE_ACKNOWLEDGED = "total_message_acknowledged";
private static final String TOTAL_MESSAGE_ADDED = "total_message_added";
Expand All @@ -57,12 +52,6 @@ public void init(ActiveMQServer server, Gauge<Integer> mapSize, Gauge<Integer> m
metricsService.registerGauge(aclSize, metricModuleName, LoginMetric.COMPONENT_LOGIN, ACL);
metricsService.registerGauge(activeConnection, metricModuleName, LoginMetric.COMPONENT_LOGIN, ACTIVE_CONNECTION);

metricsService.registerGauge(() -> server.getSessions().size(), metricModuleName, LoginMetric.COMPONENT_LOGIN, SESSION);
metricsService.registerGauge(() -> server.getConnectionCount(), metricModuleName, LoginMetric.COMPONENT_LOGIN, CONNECTION);
metricsService.registerGauge(() -> server.getBrokerConnections().size(), metricModuleName, LoginMetric.COMPONENT_LOGIN, BROKER_CONNECTION);
//from broker
metricsService.registerGauge(() -> server.getDiskStoreUsage(), metricModuleName, LoginMetric.COMPONENT_LOGIN, DISK_USAGE, MetricsLabel.SIZE);
metricsService.registerGauge(() -> server.getTotalConnectionCount(), metricModuleName, LoginMetric.COMPONENT_LOGIN, TOTAL_CONNECTION, MetricsLabel.SIZE);
metricsService.registerGauge(() -> server.getTotalMessageCount(), metricModuleName, LoginMetric.COMPONENT_LOGIN, TOTAL_MESSAGE, MetricsLabel.SIZE);
metricsService.registerGauge(() -> server.getTotalMessagesAcknowledged(), metricModuleName, LoginMetric.COMPONENT_LOGIN, TOTAL_MESSAGE_ACKNOWLEDGED, MetricsLabel.SIZE);
metricsService.registerGauge(() -> server.getTotalMessagesAdded(), metricModuleName, LoginMetric.COMPONENT_LOGIN, TOTAL_MESSAGE_ADDED, MetricsLabel.SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ public interface ServiceEventBusDriver {
public void stop() throws ServiceEventBusException;

public ServiceEventBus getEventBus();

public Boolean isConnected();
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ private void setSession(ServiceEvent kapuaEvent) {
KapuaSession.createFrom(kapuaEvent.getScopeId(), kapuaEvent.getUserId());
}

public Boolean isConnected() {
return eventBusJMSConnectionBridge.isConnected();
}

/**
* Stop the event bus
*
Expand Down Expand Up @@ -209,9 +213,15 @@ private class EventBusJMSConnectionBridge {
private Connection jmsConnection;
private Map<String, SenderPool> senders = new HashMap<>();
private ExceptionListenerImpl exceptionListener;
private Boolean connected;

public EventBusJMSConnectionBridge() {
this.exceptionListener = new ExceptionListenerImpl();
connected = Boolean.FALSE;
}

Boolean isConnected() {
return connected;
}

void start() throws JMSException, NamingException, ServiceEventBusException {
Expand All @@ -230,6 +240,7 @@ void start() throws JMSException, NamingException, ServiceEventBusException {

jmsConnection = jmsConnectionFactory.createConnection(eventbusUsername, eventbusPassword);
jmsConnection.setExceptionListener(exceptionListener);
connected = Boolean.TRUE;
jmsConnection.start();
}

Expand All @@ -239,6 +250,7 @@ void stop() throws ServiceEventBusException {
}

private void closeConnection() {
connected = Boolean.FALSE;
try {
if (jmsConnection != null) {
exceptionListener.stop();
Expand Down Expand Up @@ -436,6 +448,7 @@ private class ExceptionListenerImpl implements ExceptionListener {

@Override
public void onException(JMSException e) {
connected = Boolean.FALSE;
LOGGER.error("EventBus Listener {} - Connection thrown exception: {}", this, e.getMessage(), e);
commonsMetric.getEventBusConnectionError().inc();
int i = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,22 @@ protected void configureModule() {
@Singleton
ServiceEventBusDriver serviceEventBusDriver() {
return new ServiceEventBusDriver() {

private Boolean connected = Boolean.FALSE;

@Override
public String getType() {
return "test";
}

@Override
public void start() throws ServiceEventBusException {
//Nothing to do!
connected = Boolean.TRUE;
}

@Override
public void stop() throws ServiceEventBusException {
//Nothing to do!
connected = Boolean.FALSE;
}

@Override
Expand All @@ -61,6 +64,11 @@ public void subscribe(String address, String name, ServiceEventBusListener event
}
};
}

@Override
public Boolean isConnected() {
return connected;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,22 @@ String eventsModuleName() {
@Singleton
ServiceEventBusDriver serviceEventBusDriver() {
return new ServiceEventBusDriver() {

private Boolean connected = Boolean.FALSE;

@Override
public String getType() {
return "test";
}

@Override
public void start() throws ServiceEventBusException {
//Nothing to do!
connected = Boolean.TRUE;
}

@Override
public void stop() throws ServiceEventBusException {
//Nothing to do!
connected = Boolean.FALSE;
}

@Override
Expand All @@ -76,6 +79,11 @@ public void subscribe(String address, String name, ServiceEventBusListener event
}
};
}

@Override
public Boolean isConnected() {
return connected;
}
};
}
}

0 comments on commit 3025da1

Please sign in to comment.