Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

:fix: add auto reconnect to service client #4012

Merged
merged 1 commit into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,14 @@ private AccountInfo getAdminAccountInfoNoCache() throws JsonProcessingException,
SecurityAction.getEntity.name(),
EntityType.account.name(),
SystemSetting.getInstance().getString(SystemSettingKey.SYS_ADMIN_ACCOUNT));
EntityResponse accountResponse = serverContext.getAuthServiceClient().getEntity(accountRequest);
if (accountResponse != null) {
return new AccountInfo(KapuaEid.parseCompactId(accountResponse.getId()), accountResponse.getName());
EntityResponse accountResponse;
try {
accountResponse = serverContext.getAuthServiceClient().getEntity(accountRequest);
if (accountResponse != null) {
return new AccountInfo(KapuaEid.parseCompactId(accountResponse.getId()), accountResponse.getName());
}
} catch (JsonProcessingException | JMSException | InterruptedException e) {
logger.warn("Error getting scopeId for user admin", e);
}
throw new SecurityException("User not authorized! Cannot get Admin Account info!");
}
Expand All @@ -403,9 +408,13 @@ private KapuaId getScopeIdNoCache(String username) throws JsonProcessingExceptio
SecurityAction.getEntity.name(),
EntityType.user.name(),
username);
EntityResponse userResponse = serverContext.getAuthServiceClient().getEntity(userRequest);
if (userResponse != null && userResponse.getScopeId() != null) {
return KapuaEid.parseCompactId(userResponse.getScopeId());
try {
EntityResponse userResponse = serverContext.getAuthServiceClient().getEntity(userRequest);
if (userResponse != null && userResponse.getScopeId() != null) {
return KapuaEid.parseCompactId(userResponse.getScopeId());
}
} catch (JsonProcessingException | JMSException | InterruptedException e) {
logger.warn("Error getting scopeId for username {}", username, e);
}
throw new SecurityException("User not authorized! Cannot get scopeId for username:" + username);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.eclipse.kapua.client.security;

import com.fasterxml.jackson.core.JsonProcessingException;

import org.eclipse.kapua.client.security.amqpclient.Client;
import org.eclipse.kapua.client.security.bean.AuthRequest;
import org.eclipse.kapua.client.security.bean.AuthResponse;
Expand Down Expand Up @@ -44,7 +45,6 @@ public ServiceClientMessagingImpl(MessageListener messageListener, Client client

@Override
public AuthResponse brokerConnect(AuthRequest authRequest) throws InterruptedException, JMSException, JsonProcessingException {//TODO review exception when Kapua code will be linked (throw KapuaException)
client.checkAuthServiceConnection();
String requestId = MessageHelper.getNewRequestId();
authRequest.setRequestId(requestId);
authRequest.setAction(SecurityAction.brokerConnect.name());
Expand All @@ -60,7 +60,6 @@ public AuthResponse brokerConnect(AuthRequest authRequest) throws InterruptedExc

@Override
public AuthResponse brokerDisconnect(AuthRequest authRequest) throws JMSException, InterruptedException, JsonProcessingException {
client.checkAuthServiceConnection();
String requestId = MessageHelper.getNewRequestId();
authRequest.setRequestId(requestId);
authRequest.setAction(SecurityAction.brokerDisconnect.name());
Expand All @@ -76,7 +75,6 @@ public AuthResponse brokerDisconnect(AuthRequest authRequest) throws JMSExceptio

@Override
public EntityResponse getEntity(EntityRequest entityRequest) throws JMSException, InterruptedException, JsonProcessingException {
client.checkAuthServiceConnection();
String requestId = MessageHelper.getNewRequestId();
entityRequest.setRequestId(requestId);
ResponseContainer<EntityResponse> responseContainer = ResponseContainer.createAnRegisterNewMessageContainer(messageListener, entityRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
Expand All @@ -32,36 +33,39 @@ public class Client {

private static Logger logger = LoggerFactory.getLogger(Client.class);

private static final long WAIT_BETWEEN_RECONNECTION_ATTEMPT = 2000;

private ConnectionFactory connectionFactory;//is this reference needed?
private Connection connection;//keep to implement cleanup (and object lifecycle)
private Session session;
private MessageConsumer consumer;
private MessageProducer producer;
private String clientId;
private String requestAddress;
private String replyAddress;
private ClientMessageListener clientMessageListener;
private ExceptionListener exceptionListener;

private ConnectionStatus connectionStatus;
private boolean active;
private boolean connectionStatus;

public Client(String username, String password, String host, int port, String clientId,
String requestAddress, String replyAddress, ClientMessageListener clientMessageListener) throws JMSException {
connectionStatus = new ConnectionStatus();
this.clientId = clientId;
this.requestAddress = requestAddress;
this.replyAddress = replyAddress;
this.clientMessageListener = clientMessageListener;
connectionFactory = new JmsConnectionFactory(username, password, "amqp://" + host + ":" + port);
connection = connectionFactory.createConnection();
connection.setExceptionListener(new JMSExceptionListner(connectionStatus, clientId));
connection.setClientID(clientId);
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
logger.info("AMQP client binding request sender to: {}", requestAddress);
logger.info("AMQP client binding message listener to: {}", replyAddress);
consumer = session.createConsumer(session.createQueue(replyAddress));
consumer.setMessageListener(clientMessageListener);
producer = session.createProducer(session.createQueue(requestAddress));
clientMessageListener.init(session, producer);
connectionStatus.setConnectionAlive();
}
exceptionListener = new ExceptionListener() {

public void checkAuthServiceConnection() {
if (!isConnected()) {
//TODO throw exception and deny operations
}
@Override
public void onException(JMSException exception) {
connectionStatus = false;
connect();
}
};
active = true;
connect();
}

public void sendMessage(TextMessage message) throws JMSException {
Expand All @@ -73,12 +77,62 @@ public TextMessage createTextMessage() throws JMSException {
return session.createTextMessage();
}

public boolean isConnected() {
public void stop() throws JMSException {
active = false;
disconnect();
}

private void disconnect() throws JMSException {
connectionStatus = false;
if (connection != null) {
connection.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we set the connectionStatus here to false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was relying on the ExceptionListener but you are right: it's better to set it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay let's add it in, then I'd be okay approving this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

}
}

private void connect() {
int connectAttempt = 0;
while (active && !isConnected()) {
logger.info("Connect attempt {}...", connectAttempt);
try {
logger.info("Service client {} - restarting attempt... {}", this, connectAttempt);
disconnect();
connection = connectionFactory.createConnection();
connection.setExceptionListener(exceptionListener);
connection.setClientID(clientId);
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
logger.info("AMQP client binding request sender to: {}", requestAddress);
logger.info("AMQP client binding message listener to: {}", replyAddress);
consumer = session.createConsumer(session.createQueue(replyAddress));
consumer.setMessageListener(clientMessageListener);
producer = session.createProducer(session.createQueue(requestAddress));
clientMessageListener.init(session, producer);
connectionStatus = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there not a way to read the connection status from one of these objects instead of managing it ourselves? Should it be set to false on exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately no. The javax.jms.Connection has no connect flag o isConnect method.

logger.info("Service client {} - restarting attempt... {} DONE (Connection restored)", this, connectAttempt);
} catch (JMSException e) {
logger.info("Connect attempt {}... FAIL", connectAttempt, e);
//wait a bit
waitBeforeRetry();
}
connectAttempt++;
}
}

private boolean isConnected() {
try {
return connection!=null && connection.getClientID()!=null && connectionStatus.isAlive();
return connection!=null && connection.getClientID()!=null && connectionStatus;
} catch (JMSException e) {
logger.warn("Error while validating connection: {}", e.getMessage(), e);
return false;
}
}

private void waitBeforeRetry() {
try {
Thread.sleep(WAIT_BETWEEN_RECONNECTION_ATTEMPT);
elbert3 marked this conversation as resolved.
Show resolved Hide resolved
} catch (InterruptedException e) {
logger.error("Wait for connect interrupted!", e);
}
}

}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class JMSServiceEventBus implements ServiceEventBus, ServiceEventBusDrive

private static final Logger LOGGER = LoggerFactory.getLogger(JMSServiceEventBus.class);

private static final long WAIT_BETWEEN_RECONNECTION_ATTEMPT = 2000;

private final int producerPoolMinSize;
private final int producerPoolMaxSize;
private final int producerPoolBorrowWait;
Expand Down Expand Up @@ -156,9 +158,8 @@ public ServiceEventBus getEventBus() {
}

private void waitBeforeRetry() {
// wait a bit
try {
Thread.sleep(2000);// TODO move to configuration
Thread.sleep(WAIT_BETWEEN_RECONNECTION_ATTEMPT);
elbert3 marked this conversation as resolved.
Show resolved Hide resolved
} catch (InterruptedException e) {
LOGGER.error("Wait for connect interrupted!", e);
}
Expand Down Expand Up @@ -461,6 +462,7 @@ public void onException(JMSException e) {
break;
} catch (ServiceEventBusException | JMSException e1) {
LOGGER.error("EventBus Listener {} - Cannot start new event bus connection... try again...", this, e1);
// wait a bit
waitBeforeRetry();
}
i++;
Expand Down
Loading