Skip to content

Commit

Permalink
:fix: add auto reconnect to service client
Browse files Browse the repository at this point in the history
Signed-off-by: riccardomodanese <[email protected]>
  • Loading branch information
riccardomodanese committed Apr 5, 2024
1 parent 81a090d commit f2243bc
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,14 @@ private AccountInfo getAdminAccountInfoNoCache() throws JsonProcessingException,
SecurityAction.getEntity.name(),
EntityType.account.name(),
SystemSetting.getInstance().getString(SystemSettingKey.SYS_ADMIN_ACCOUNT));
EntityResponse accountResponse = serverContext.getAuthServiceClient().getEntity(accountRequest);
if (accountResponse != null) {
return new AccountInfo(KapuaEid.parseCompactId(accountResponse.getId()), accountResponse.getName());
EntityResponse accountResponse;
try {
accountResponse = serverContext.getAuthServiceClient().getEntity(accountRequest);

Check warning on line 384 in broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/SecurityPlugin.java

View check run for this annotation

Codecov / codecov/patch

broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/SecurityPlugin.java#L384

Added line #L384 was not covered by tests
if (accountResponse != null) {
return new AccountInfo(KapuaEid.parseCompactId(accountResponse.getId()), accountResponse.getName());

Check warning on line 386 in broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/SecurityPlugin.java

View check run for this annotation

Codecov / codecov/patch

broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/SecurityPlugin.java#L386

Added line #L386 was not covered by tests
}
} catch (JsonProcessingException | JMSException | InterruptedException e) {
logger.warn("Error getting scopeId for user admin", e);

Check warning on line 389 in broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/SecurityPlugin.java

View check run for this annotation

Codecov / codecov/patch

broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/SecurityPlugin.java#L388-L389

Added lines #L388 - L389 were not covered by tests
}
throw new SecurityException("User not authorized! Cannot get Admin Account info!");
}
Expand All @@ -403,9 +408,13 @@ private KapuaId getScopeIdNoCache(String username) throws JsonProcessingExceptio
SecurityAction.getEntity.name(),
EntityType.user.name(),
username);
EntityResponse userResponse = serverContext.getAuthServiceClient().getEntity(userRequest);
if (userResponse != null && userResponse.getScopeId() != null) {
return KapuaEid.parseCompactId(userResponse.getScopeId());
try {
EntityResponse userResponse = serverContext.getAuthServiceClient().getEntity(userRequest);

Check warning on line 412 in broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/SecurityPlugin.java

View check run for this annotation

Codecov / codecov/patch

broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/SecurityPlugin.java#L412

Added line #L412 was not covered by tests
if (userResponse != null && userResponse.getScopeId() != null) {
return KapuaEid.parseCompactId(userResponse.getScopeId());

Check warning on line 414 in broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/SecurityPlugin.java

View check run for this annotation

Codecov / codecov/patch

broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/SecurityPlugin.java#L414

Added line #L414 was not covered by tests
}
} catch (JsonProcessingException | JMSException | InterruptedException e) {
logger.warn("Error getting scopeId for username {}", username, e);

Check warning on line 417 in broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/SecurityPlugin.java

View check run for this annotation

Codecov / codecov/patch

broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/SecurityPlugin.java#L416-L417

Added lines #L416 - L417 were not covered by tests
}
throw new SecurityException("User not authorized! Cannot get scopeId for username:" + username);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.eclipse.kapua.client.security;

import com.fasterxml.jackson.core.JsonProcessingException;

import org.eclipse.kapua.client.security.amqpclient.Client;
import org.eclipse.kapua.client.security.bean.AuthRequest;
import org.eclipse.kapua.client.security.bean.AuthResponse;
Expand Down Expand Up @@ -44,7 +45,6 @@ public ServiceClientMessagingImpl(MessageListener messageListener, Client client

@Override
public AuthResponse brokerConnect(AuthRequest authRequest) throws InterruptedException, JMSException, JsonProcessingException {//TODO review exception when Kapua code will be linked (throw KapuaException)
client.checkAuthServiceConnection();
String requestId = MessageHelper.getNewRequestId();
authRequest.setRequestId(requestId);
authRequest.setAction(SecurityAction.brokerConnect.name());
Expand All @@ -60,7 +60,6 @@ public AuthResponse brokerConnect(AuthRequest authRequest) throws InterruptedExc

@Override
public AuthResponse brokerDisconnect(AuthRequest authRequest) throws JMSException, InterruptedException, JsonProcessingException {
client.checkAuthServiceConnection();
String requestId = MessageHelper.getNewRequestId();
authRequest.setRequestId(requestId);
authRequest.setAction(SecurityAction.brokerDisconnect.name());
Expand All @@ -76,7 +75,6 @@ public AuthResponse brokerDisconnect(AuthRequest authRequest) throws JMSExceptio

@Override
public EntityResponse getEntity(EntityRequest entityRequest) throws JMSException, InterruptedException, JsonProcessingException {
client.checkAuthServiceConnection();
String requestId = MessageHelper.getNewRequestId();
entityRequest.setRequestId(requestId);
ResponseContainer<EntityResponse> responseContainer = ResponseContainer.createAnRegisterNewMessageContainer(messageListener, entityRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
Expand All @@ -32,36 +33,39 @@ public class Client {

private static Logger logger = LoggerFactory.getLogger(Client.class);

private static final long WAIT_BETWEEN_RECONNECTION_ATTEMPT = 2000;

private ConnectionFactory connectionFactory;//is this reference needed?
private Connection connection;//keep to implement cleanup (and object lifecycle)
private Session session;
private MessageConsumer consumer;
private MessageProducer producer;
private String clientId;
private String requestAddress;
private String replyAddress;
private ClientMessageListener clientMessageListener;
private ExceptionListener exceptionListener;

private ConnectionStatus connectionStatus;
private boolean active;
private boolean connectionStatus;

public Client(String username, String password, String host, int port, String clientId,
String requestAddress, String replyAddress, ClientMessageListener clientMessageListener) throws JMSException {
connectionStatus = new ConnectionStatus();
this.clientId = clientId;
this.requestAddress = requestAddress;
this.replyAddress = replyAddress;
this.clientMessageListener = clientMessageListener;

Check warning on line 57 in client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java#L54-L57

Added lines #L54 - L57 were not covered by tests
connectionFactory = new JmsConnectionFactory(username, password, "amqp://" + host + ":" + port);
connection = connectionFactory.createConnection();
connection.setExceptionListener(new JMSExceptionListner(connectionStatus, clientId));
connection.setClientID(clientId);
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
logger.info("AMQP client binding request sender to: {}", requestAddress);
logger.info("AMQP client binding message listener to: {}", replyAddress);
consumer = session.createConsumer(session.createQueue(replyAddress));
consumer.setMessageListener(clientMessageListener);
producer = session.createProducer(session.createQueue(requestAddress));
clientMessageListener.init(session, producer);
connectionStatus.setConnectionAlive();
}
exceptionListener = new ExceptionListener() {

Check warning on line 59 in client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java#L59

Added line #L59 was not covered by tests

public void checkAuthServiceConnection() {
if (!isConnected()) {
//TODO throw exception and deny operations
}
@Override
public void onException(JMSException exception) {
connectionStatus = false;
connect();
}

Check warning on line 65 in client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java#L63-L65

Added lines #L63 - L65 were not covered by tests
};
active = true;
connect();

Check warning on line 68 in client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java#L67-L68

Added lines #L67 - L68 were not covered by tests
}

public void sendMessage(TextMessage message) throws JMSException {
Expand All @@ -73,12 +77,61 @@ public TextMessage createTextMessage() throws JMSException {
return session.createTextMessage();
}

public boolean isConnected() {
public void stop() throws JMSException {
active = false;
disconnect();
}

Check warning on line 83 in client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java#L81-L83

Added lines #L81 - L83 were not covered by tests

private void disconnect() throws JMSException {
if (connection != null) {
connection.close();

Check warning on line 87 in client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java#L87

Added line #L87 was not covered by tests
}
}

Check warning on line 89 in client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java#L89

Added line #L89 was not covered by tests

private void connect() {
int connectAttempt = 0;

Check warning on line 92 in client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java#L92

Added line #L92 was not covered by tests
while (active && !isConnected()) {
logger.info("Connect attempt {}...", connectAttempt);

Check warning on line 94 in client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java#L94

Added line #L94 was not covered by tests
try {
logger.info("Service client {} - restarting attempt... {}", this, connectAttempt);
disconnect();
connection = connectionFactory.createConnection();
connection.setExceptionListener(exceptionListener);
connection.setClientID(clientId);
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
logger.info("AMQP client binding request sender to: {}", requestAddress);
logger.info("AMQP client binding message listener to: {}", replyAddress);
consumer = session.createConsumer(session.createQueue(replyAddress));
consumer.setMessageListener(clientMessageListener);
producer = session.createProducer(session.createQueue(requestAddress));
clientMessageListener.init(session, producer);
connectionStatus = true;
logger.info("Service client {} - restarting attempt... {} DONE (Connection restored)", this, connectAttempt);
} catch (JMSException e) {
logger.info("Connect attempt {}... FAIL", connectAttempt, e);

Check warning on line 112 in client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java#L96-L112

Added lines #L96 - L112 were not covered by tests
//wait a bit
waitBeforeRetry();
}
connectAttempt++;

Check warning on line 116 in client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java#L114-L116

Added lines #L114 - L116 were not covered by tests
}
}

Check warning on line 118 in client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java#L118

Added line #L118 was not covered by tests

private boolean isConnected() {
try {
return connection!=null && connection.getClientID()!=null && connectionStatus.isAlive();
return connection!=null && connection.getClientID()!=null && connectionStatus;
} catch (JMSException e) {
logger.warn("Error while validating connection: {}", e.getMessage(), e);
return false;
}
}

private void waitBeforeRetry() {
try {
Thread.sleep(WAIT_BETWEEN_RECONNECTION_ATTEMPT);
} catch (InterruptedException e) {
logger.error("Wait for connect interrupted!", e);
}
}

Check warning on line 135 in client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java#L131-L135

Added lines #L131 - L135 were not covered by tests

}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class JMSServiceEventBus implements ServiceEventBus, ServiceEventBusDrive

private static final Logger LOGGER = LoggerFactory.getLogger(JMSServiceEventBus.class);

private static final long WAIT_BETWEEN_RECONNECTION_ATTEMPT = 2000;

private final int producerPoolMinSize;
private final int producerPoolMaxSize;
private final int producerPoolBorrowWait;
Expand Down Expand Up @@ -156,9 +158,8 @@ public ServiceEventBus getEventBus() {
}

private void waitBeforeRetry() {
// wait a bit
try {
Thread.sleep(2000);// TODO move to configuration
Thread.sleep(WAIT_BETWEEN_RECONNECTION_ATTEMPT);

Check warning on line 162 in commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java

View check run for this annotation

Codecov / codecov/patch

commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java#L162

Added line #L162 was not covered by tests
} catch (InterruptedException e) {
LOGGER.error("Wait for connect interrupted!", e);
}
Expand Down Expand Up @@ -318,7 +319,7 @@ synchronized void subscribe(Subscription subscription)
final Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic jmsTopic = jmsSession.createTopic(subscriptionStr);
for (int i = 0; i < consumerPoolSize; i++) {
MessageConsumer jmsConsumer = jmsSession.createSharedDurableConsumer(jmsTopic, subscription.getName());
MessageConsumer jmsConsumer = jmsSession.createSharedConsumer(jmsTopic, subscription.getName());

Check warning on line 322 in commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java

View check run for this annotation

Codecov / codecov/patch

commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java#L322

Added line #L322 was not covered by tests
jmsConsumer.setMessageListener(message -> {
try {
if (message instanceof TextMessage) {
Expand Down Expand Up @@ -461,6 +462,7 @@ public void onException(JMSException e) {
break;
} catch (ServiceEventBusException | JMSException e1) {
LOGGER.error("EventBus Listener {} - Cannot start new event bus connection... try again...", this, e1);
// wait a bit
waitBeforeRetry();
}
i++;
Expand Down

0 comments on commit f2243bc

Please sign in to comment.