-
Notifications
You must be signed in to change notification settings - Fork 161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
:fix: add auto reconnect to service client #4012
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ | |
|
||
import javax.jms.Connection; | ||
import javax.jms.ConnectionFactory; | ||
import javax.jms.ExceptionListener; | ||
import javax.jms.JMSException; | ||
import javax.jms.MessageConsumer; | ||
import javax.jms.MessageProducer; | ||
|
@@ -32,36 +33,39 @@ public class Client { | |
|
||
private static Logger logger = LoggerFactory.getLogger(Client.class); | ||
|
||
private static final long WAIT_BETWEEN_RECONNECTION_ATTEMPT = 2000; | ||
|
||
private ConnectionFactory connectionFactory;//is this reference needed? | ||
private Connection connection;//keep to implement cleanup (and object lifecycle) | ||
private Session session; | ||
private MessageConsumer consumer; | ||
private MessageProducer producer; | ||
private String clientId; | ||
private String requestAddress; | ||
private String replyAddress; | ||
private ClientMessageListener clientMessageListener; | ||
private ExceptionListener exceptionListener; | ||
|
||
private ConnectionStatus connectionStatus; | ||
private boolean active; | ||
private boolean connectionStatus; | ||
|
||
public Client(String username, String password, String host, int port, String clientId, | ||
String requestAddress, String replyAddress, ClientMessageListener clientMessageListener) throws JMSException { | ||
connectionStatus = new ConnectionStatus(); | ||
this.clientId = clientId; | ||
this.requestAddress = requestAddress; | ||
this.replyAddress = replyAddress; | ||
this.clientMessageListener = clientMessageListener; | ||
connectionFactory = new JmsConnectionFactory(username, password, "amqp://" + host + ":" + port); | ||
connection = connectionFactory.createConnection(); | ||
connection.setExceptionListener(new JMSExceptionListner(connectionStatus, clientId)); | ||
connection.setClientID(clientId); | ||
connection.start(); | ||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); | ||
logger.info("AMQP client binding request sender to: {}", requestAddress); | ||
logger.info("AMQP client binding message listener to: {}", replyAddress); | ||
consumer = session.createConsumer(session.createQueue(replyAddress)); | ||
consumer.setMessageListener(clientMessageListener); | ||
producer = session.createProducer(session.createQueue(requestAddress)); | ||
clientMessageListener.init(session, producer); | ||
connectionStatus.setConnectionAlive(); | ||
} | ||
exceptionListener = new ExceptionListener() { | ||
|
||
public void checkAuthServiceConnection() { | ||
if (!isConnected()) { | ||
//TODO throw exception and deny operations | ||
} | ||
@Override | ||
public void onException(JMSException exception) { | ||
connectionStatus = false; | ||
connect(); | ||
} | ||
}; | ||
active = true; | ||
connect(); | ||
} | ||
|
||
public void sendMessage(TextMessage message) throws JMSException { | ||
|
@@ -73,12 +77,62 @@ public TextMessage createTextMessage() throws JMSException { | |
return session.createTextMessage(); | ||
} | ||
|
||
public boolean isConnected() { | ||
public void stop() throws JMSException { | ||
active = false; | ||
disconnect(); | ||
} | ||
|
||
private void disconnect() throws JMSException { | ||
connectionStatus = false; | ||
if (connection != null) { | ||
connection.close(); | ||
} | ||
} | ||
|
||
private void connect() { | ||
int connectAttempt = 0; | ||
while (active && !isConnected()) { | ||
logger.info("Connect attempt {}...", connectAttempt); | ||
try { | ||
logger.info("Service client {} - restarting attempt... {}", this, connectAttempt); | ||
disconnect(); | ||
connection = connectionFactory.createConnection(); | ||
connection.setExceptionListener(exceptionListener); | ||
connection.setClientID(clientId); | ||
connection.start(); | ||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); | ||
logger.info("AMQP client binding request sender to: {}", requestAddress); | ||
logger.info("AMQP client binding message listener to: {}", replyAddress); | ||
consumer = session.createConsumer(session.createQueue(replyAddress)); | ||
consumer.setMessageListener(clientMessageListener); | ||
producer = session.createProducer(session.createQueue(requestAddress)); | ||
clientMessageListener.init(session, producer); | ||
connectionStatus = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there not a way to read the connection status from one of these objects instead of managing it ourselves? Should it be set to false on exception? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately no. The javax.jms.Connection has no connect flag o isConnect method. |
||
logger.info("Service client {} - restarting attempt... {} DONE (Connection restored)", this, connectAttempt); | ||
} catch (JMSException e) { | ||
logger.info("Connect attempt {}... FAIL", connectAttempt, e); | ||
//wait a bit | ||
waitBeforeRetry(); | ||
} | ||
connectAttempt++; | ||
} | ||
} | ||
|
||
private boolean isConnected() { | ||
try { | ||
return connection!=null && connection.getClientID()!=null && connectionStatus.isAlive(); | ||
return connection!=null && connection.getClientID()!=null && connectionStatus; | ||
} catch (JMSException e) { | ||
logger.warn("Error while validating connection: {}", e.getMessage(), e); | ||
return false; | ||
} | ||
} | ||
|
||
private void waitBeforeRetry() { | ||
try { | ||
Thread.sleep(WAIT_BETWEEN_RECONNECTION_ATTEMPT); | ||
elbert3 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} catch (InterruptedException e) { | ||
logger.error("Wait for connect interrupted!", e); | ||
} | ||
} | ||
|
||
} |
This file was deleted.
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we set the connectionStatus here to false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was relying on the ExceptionListener but you are right: it's better to set it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay let's add it in, then I'd be okay approving this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!