diff --git a/README.md b/README.md index 9bca03bc5..dc3f7ca01 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2 common library (Java) (3.33.0) +# th2 common library (Java) (3.34.0) ## Usage @@ -35,8 +35,9 @@ Then you will create an instance of imported class, by choosing one of the follo 1. rabbitMq.json - configuration for RabbitMQ 2. mq.json - configuration for MessageRouter 3. grpc.json - configuration for GrpcRouter - 4. cradle.json - configuration for cradle - 5. custom.json - custom configuration + 4. cradle.json - confidential configuration for cradle + 5. cradle_manager.json - non confidential configuration for cradle + 6. custom.json - custom configuration The second group: * --namespace - the namespace in Kubernetes to search config maps @@ -62,7 +63,7 @@ Then you will create an instance of imported class, by choosing one of the follo ### Configuration formats -The `CommonFactory` reads a RabbitMQ configuration from the rabbitMQ.json file. +The `CommonFactory` reads a RabbitMQ configuration from the `rabbitMQ.json` file. * host - the required setting defines the RabbitMQ host. * vHost - the required setting defines the virtual host that will be used for connecting to RabbitMQ. Please see more details about the virtual host in RabbitMQ via [link](https://www.rabbitmq.com/vhosts.html) @@ -72,14 +73,6 @@ The `CommonFactory` reads a RabbitMQ configuration from the rabbitMQ.json file. * password - the required setting defines the password that will be used for connecting to RabbitMQ. * exchangeName - the required setting defines the exchange that will be used for sending/subscribing operation in MQ routers. Please see more details about the exchanges in RabbitMQ via [link](https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchanges) -* connectionTimeout - the connection TCP establishment timeout in milliseconds with its default value set to 60000. Use zero for infinite waiting. -* connectionCloseTimeout - the timeout in milliseconds for completing all the close-related operations, use -1 for infinity, the default value is set to 10000. -* maxRecoveryAttempts - this option defines the number of reconnection attempts to RabbitMQ, with its default value set to 5. - The `th2_readiness` probe is set to false and publishers are blocked after a lost connection to RabbitMQ. The `th2_readiness` probe is reverted to true if the connection will be recovered during specified attempts otherwise the `th2_liveness` probe will be set to false. -* minConnectionRecoveryTimeout - this option defines a minimal interval in milliseconds between reconnect attempts, with its default value set to 10000. Common factory increases the reconnect interval values from minConnectionRecoveryTimeout to maxConnectionRecoveryTimeout. -* maxConnectionRecoveryTimeout - this option defines a maximum interval in milliseconds between reconnect attempts, with its default value set to 60000. Common factory increases the reconnect interval values from minConnectionRecoveryTimeout to maxConnectionRecoveryTimeout. -* prefetchCount - this option is the maximum number of messages that the server will deliver, with its value set to 0 if unlimited, the default value is set to 10. -* messageRecursionLimit - an integer number denotes how deep nested protobuf message might be, default = 100 ```json { @@ -88,14 +81,34 @@ The `CommonFactory` reads a RabbitMQ configuration from the rabbitMQ.json file. "port": 5672, "username": "", "password": "", - "exchangeName": "", + "exchangeName": "" +} +``` + +The `CommonFactory` reads a RabbitMQ connection configuration from the `mq_router.json` file. +* subscriberName - the client-generated consumer tag to establish context, default = (`rabbit_mq_subscriber.` + current time in milliseconds) +* connectionTimeout - the connection TCP establishment timeout in milliseconds with its default value set to 60000. Use zero for infinite waiting. +* connectionCloseTimeout - the timeout in milliseconds for completing all the close-related operations, use -1 for infinity, the default value is set to 10000. +* maxRecoveryAttempts - this option defines the number of reconnection attempts to RabbitMQ, with its default value set to 5. + The `th2_readiness` probe is set to false and publishers are blocked after a lost connection to RabbitMQ. The `th2_readiness` probe is reverted to true if the connection will be recovered during specified attempts otherwise the `th2_liveness` probe will be set to false. +* minConnectionRecoveryTimeout - this option defines a minimal interval in milliseconds between reconnect attempts, with its default value set to 10000. Common factory increases the reconnect interval values from minConnectionRecoveryTimeout to maxConnectionRecoveryTimeout. +* maxConnectionRecoveryTimeout - this option defines a maximum interval in milliseconds between reconnect attempts, with its default value set to 60000. Common factory increases the reconnect interval values from minConnectionRecoveryTimeout to maxConnectionRecoveryTimeout. +* prefetchCount - this option is the maximum number of messages that the server will deliver, with its value set to 0 if unlimited, the default value is set to 10. +* messageRecursionLimit - an integer number denotes how deep nested protobuf message might be, default = 100 +* virtualPublishLimit - MQ router requests actual queue set bound to routing key related to publish pin and then get maximum queue size to this value. The router blocks sending via pin where the limit is exceeded and execute regular check of the current state of the bound queues. The block will be lifted when the max size of queues go down less than the virtual publish limit. +* secondsToCheckVirtualPublishLimit - this option defines an interval in seconds between size check attempts, default = 10 +```json +{ + "subscriberName": "", "connectionTimeout": 60000, "connectionCloseTimeout": 10000, "maxRecoveryAttempts": 5, "minConnectionRecoveryTimeout": 10000, "maxConnectionRecoveryTimeout": 60000, "prefetchCount": 10, - "messageRecursionLimit": 100 + "messageRecursionLimit": 100, + "virtualPublishLimit": 10000, + "secondsToCheckVirtualPublishLimit": 10 } ``` @@ -160,7 +173,7 @@ Filters format: } ``` -The `CommonFactory` reads a Cradle configuration from the cradle.json file. +The `CommonFactory` reads a Cradle configuration from the `cradle.json` file. * dataCenter - the required setting defines the data center in the Cassandra cluster. * host - the required setting defines the Cassandra host. * port - the required setting defines the Cassandra port. @@ -168,10 +181,6 @@ The `CommonFactory` reads a Cradle configuration from the cradle.json file. * username - the required setting defines the Cassandra username. The user must have permission to write data using a specified keyspace. * password - the required setting defines the password that will be used for connecting to Cassandra. * cradleInstanceName - this option defines a special identifier that divides data within one keyspace with infra set as the default value. -* cradleMaxEventBatchSize - this option defines the maximum event batch size in bytes with its default value set to 1048576. -* cradleMaxMessageBatchSize - this option defines the maximum message batch size in bytes with its default value set to 1048576. -* timeout - this option defines connection timeout in milliseconds. If set to 0 or ommited, the default value of 5000 is used. -* pageSize - this option defines the size of the result set to fetch at a time. If set to 0 or ommited, the default value of 5000 is used. ```json { @@ -181,7 +190,18 @@ The `CommonFactory` reads a Cradle configuration from the cradle.json file. "keyspace": "", "username": "", "password": "", - "cradleInstanceName": "", + "cradleInstanceName": "" +} +``` + +The `CommonFactory` reads a Cradle configuration from the `cradle_manager.json` file. +* cradleMaxEventBatchSize - this option defines the maximum event batch size in bytes with its default value set to 1048576. +* cradleMaxMessageBatchSize - this option defines the maximum message batch size in bytes with its default value set to 1048576. +* timeout - this option defines connection timeout in milliseconds. If set to 0 or ommited, the default value of 5000 is used. +* pageSize - this option defines the size of the result set to fetch at a time. If set to 0 or ommited, the default value of 5000 is used. + +```json +{ "cradleMaxEventBatchSize": 1048576, "cradleMaxMessageBatchSize": 1048576, "timeout": 5000, @@ -288,6 +308,11 @@ dependencies { ## Release notes +### 3.34.0 + ++ Added backpressure support: lock sending if queue virtual size limit is exceeded ++ Added parameters `virtualPublishLimit` and `secondsToCheckVirtualPublishLimit` to `mq_router.json` + ### 3.33.0 + Added ability to read dictionaries by aliases and as group of all available aliases diff --git a/build.gradle b/build.gradle index a61424583..037162966 100644 --- a/build.gradle +++ b/build.gradle @@ -171,6 +171,7 @@ dependencies { implementation "io.grpc:grpc-netty" implementation "com.rabbitmq:amqp-client" + implementation 'com.rabbitmq:http-client:4.0.0' implementation "org.jetbrains:annotations" diff --git a/gradle.properties b/gradle.properties index 3d838c7a2..a91acb1d6 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ # -# Copyright 2020-2021 Exactpro (Exactpro Systems Limited) +# Copyright 2020-2022 Exactpro (Exactpro Systems Limited) # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -13,7 +13,7 @@ # limitations under the License. # -release_version=3.33.0 +release_version=3.34.0 description = 'th2 common library (Java)' diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSender.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSender.java index 751bc4113..90e577efb 100644 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSender.java +++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSender.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,6 +55,7 @@ public abstract class AbstractRabbitSender implements MessageSender { private final AtomicReference exchangeName = new AtomicReference<>(); private final AtomicReference connectionManager = new AtomicReference<>(); private final String th2Type; + private long sentBeforeQueueSizeCheck; public AbstractRabbitSender( @NotNull ConnectionManager connectionManager, @@ -81,7 +82,7 @@ public void send(T value) throws IOException { requireNonNull(value, "Value for send can not be null"); try { - ConnectionManager connection = this.connectionManager.get(); + ConnectionManager connectionManager = this.connectionManager.get(); byte[] bytes = valueToBytes(value); MESSAGE_SIZE_PUBLISH_BYTES .labels(th2Pin, th2Type, exchangeName.get(), routingKey.get()) @@ -89,7 +90,12 @@ public void send(T value) throws IOException { MESSAGE_PUBLISH_TOTAL .labels(th2Pin, th2Type, exchangeName.get(), routingKey.get()) .inc(); - connection.basicPublish(exchangeName.get(), routingKey.get(), null, bytes); + sentBeforeQueueSizeCheck++; + if (sentBeforeQueueSizeCheck > connectionManager.getConnectionManagerConfiguration().getVirtualPublishLimit()) { + connectionManager.lockSendingIfSizeLimitExceeded(); + sentBeforeQueueSizeCheck = 0; + } + connectionManager.basicPublish(exchangeName.get(), routingKey.get(), null, bytes); if (LOGGER.isTraceEnabled()) { LOGGER.trace("Message sent to exchangeName='{}', routing key='{}': '{}'", diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java deleted file mode 100644 index c073a711f..000000000 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java +++ /dev/null @@ -1,504 +0,0 @@ -/* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.exactpro.th2.common.schema.message.impl.rabbitmq.connection; - -import com.exactpro.th2.common.metrics.HealthMetrics; -import com.exactpro.th2.common.schema.message.SubscriberMonitor; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.rabbitmq.client.AMQP.BasicProperties; -import com.rabbitmq.client.BlockedListener; -import com.rabbitmq.client.CancelCallback; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.Consumer; -import com.rabbitmq.client.DeliverCallback; -import com.rabbitmq.client.ExceptionHandler; -import com.rabbitmq.client.Recoverable; -import com.rabbitmq.client.RecoveryListener; -import com.rabbitmq.client.ShutdownNotifier; -import com.rabbitmq.client.TopologyRecoveryException; -import org.apache.commons.lang3.ObjectUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.commons.lang3.builder.ToStringStyle; -import org.jetbrains.annotations.NotNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.BiConsumer; -import java.util.function.Supplier; - -public class ConnectionManager implements AutoCloseable { - - private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionManager.class); - - private final Connection connection; - private final Map channelsByPin = new ConcurrentHashMap<>(); - private final AtomicInteger connectionRecoveryAttempts = new AtomicInteger(0); - private final AtomicBoolean connectionIsClosed = new AtomicBoolean(false); - private final ConnectionManagerConfiguration configuration; - private final String subscriberName; - private final AtomicInteger nextSubscriberId = new AtomicInteger(1); - private final ExecutorService sharedExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() - .setNameFormat("rabbitmq-shared-pool-%d") - .build()); - - private final HealthMetrics metrics = new HealthMetrics(this); - - private final RecoveryListener recoveryListener = new RecoveryListener() { - @Override - public void handleRecovery(Recoverable recoverable) { - LOGGER.debug("Count tries to recovery connection reset to 0"); - connectionRecoveryAttempts.set(0); - metrics.getReadinessMonitor().enable(); - LOGGER.debug("Set RabbitMQ readiness to true"); - } - - @Override - public void handleRecoveryStarted(Recoverable recoverable) {} - }; - - public ConnectionManagerConfiguration getConfiguration() { - return configuration; - } - - public ConnectionManager(@NotNull RabbitMQConfiguration rabbitMQConfiguration, @NotNull ConnectionManagerConfiguration connectionManagerConfiguration, Runnable onFailedRecoveryConnection) { - Objects.requireNonNull(rabbitMQConfiguration, "RabbitMQ configuration cannot be null"); - this.configuration = Objects.requireNonNull(connectionManagerConfiguration, "Connection manager configuration can not be null"); - - String subscriberNameTmp = ObjectUtils.defaultIfNull(connectionManagerConfiguration.getSubscriberName(), rabbitMQConfiguration.getSubscriberName()); - if (StringUtils.isBlank(subscriberNameTmp)) { - subscriberName = "rabbit_mq_subscriber." + System.currentTimeMillis(); - LOGGER.info("Subscribers will use default name: {}", subscriberName); - } else { - subscriberName = subscriberNameTmp + "." + System.currentTimeMillis(); - } - - var factory = new ConnectionFactory(); - var virtualHost = rabbitMQConfiguration.getVHost(); - var username = rabbitMQConfiguration.getUsername(); - var password = rabbitMQConfiguration.getPassword(); - - factory.setHost(rabbitMQConfiguration.getHost()); - factory.setPort(rabbitMQConfiguration.getPort()); - - if (StringUtils.isNotBlank(virtualHost)) { - factory.setVirtualHost(virtualHost); - } - - if (StringUtils.isNotBlank(username)) { - factory.setUsername(username); - } - - if (StringUtils.isNotBlank(password)) { - factory.setPassword(password); - } - - if (connectionManagerConfiguration.getConnectionTimeout() > 0) { - factory.setConnectionTimeout(connectionManagerConfiguration.getConnectionTimeout()); - } - - factory.setExceptionHandler(new ExceptionHandler() { - @Override - public void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception) { - turnOffReadiness(exception); - } - - @Override - public void handleReturnListenerException(Channel channel, Throwable exception) { - turnOffReadiness(exception); - } - - @Override - public void handleConfirmListenerException(Channel channel, Throwable exception) { - turnOffReadiness(exception); - } - - @Override - public void handleBlockedListenerException(Connection connection, Throwable exception) { - turnOffReadiness(exception); - } - - @Override - public void handleConsumerException(Channel channel, Throwable exception, Consumer consumer, String consumerTag, String methodName) { - turnOffReadiness(exception); - } - - @Override - public void handleConnectionRecoveryException(Connection conn, Throwable exception) { - turnOffReadiness(exception); - } - - @Override - public void handleChannelRecoveryException(Channel ch, Throwable exception) { - turnOffReadiness(exception); - } - - @Override - public void handleTopologyRecoveryException(Connection conn, Channel ch, TopologyRecoveryException exception) { - turnOffReadiness(exception); - } - - private void turnOffReadiness(Throwable exception){ - metrics.getReadinessMonitor().disable(); - LOGGER.debug("Set RabbitMQ readiness to false. RabbitMQ error", exception); - } - }); - - factory.setAutomaticRecoveryEnabled(true); - factory.setConnectionRecoveryTriggeringCondition(shutdownSignal -> { - if (connectionIsClosed.get()) { - return false; - } - - int tmpCountTriesToRecovery = connectionRecoveryAttempts.get(); - - if (tmpCountTriesToRecovery < connectionManagerConfiguration.getMaxRecoveryAttempts()) { - LOGGER.info("Try to recovery connection to RabbitMQ. Count tries = {}", tmpCountTriesToRecovery + 1); - return true; - } - LOGGER.error("Can not connect to RabbitMQ. Count tries = {}", tmpCountTriesToRecovery); - if (onFailedRecoveryConnection != null) { - onFailedRecoveryConnection.run(); - } else { - // TODO: we should stop the execution of the application. Don't use System.exit!!! - throw new IllegalStateException("Cannot recover connection to RabbitMQ"); - } - return false; - }); - - factory.setRecoveryDelayHandler(recoveryAttempts -> { - int tmpCountTriesToRecovery = connectionRecoveryAttempts.getAndIncrement(); - - int recoveryDelay = connectionManagerConfiguration.getMinConnectionRecoveryTimeout() - + (connectionManagerConfiguration.getMaxRecoveryAttempts() > 1 - ? (connectionManagerConfiguration.getMaxConnectionRecoveryTimeout() - connectionManagerConfiguration.getMinConnectionRecoveryTimeout()) - / (connectionManagerConfiguration.getMaxRecoveryAttempts() - 1) - * tmpCountTriesToRecovery - : 0); - - LOGGER.info("Recovery delay for '{}' try = {}", tmpCountTriesToRecovery, recoveryDelay); - return recoveryDelay; - } - ); - factory.setSharedExecutor(sharedExecutor); - - try { - this.connection = factory.newConnection(); - metrics.getReadinessMonitor().enable(); - LOGGER.debug("Set RabbitMQ readiness to true"); - } catch (IOException | TimeoutException e) { - metrics.getReadinessMonitor().disable(); - LOGGER.debug("Set RabbitMQ readiness to false. Can not create connection", e); - throw new IllegalStateException("Failed to create RabbitMQ connection using configuration", e); - } - - this.connection.addBlockedListener(new BlockedListener() { - @Override - public void handleBlocked(String reason) throws IOException { - LOGGER.warn("RabbitMQ blocked connection: {}", reason); - } - - @Override - public void handleUnblocked() throws IOException { - LOGGER.warn("RabbitMQ unblocked connection"); - } - }); - - if (this.connection instanceof Recoverable) { - Recoverable recoverableConnection = (Recoverable) this.connection; - recoverableConnection.addRecoveryListener(recoveryListener); - LOGGER.debug("Recovery listener was added to connection."); - } else { - throw new IllegalStateException("Connection does not implement Recoverable. Can not add RecoveryListener to it"); - } - } - - public boolean isOpen() { - return connection.isOpen() && !connectionIsClosed.get(); - } - - @Override - public void close() { - if (connectionIsClosed.getAndSet(true)) { - return; - } - - int closeTimeout = configuration.getConnectionCloseTimeout(); - if (connection.isOpen()) { - try { - // We close the connection and don't close channels - // because when a channel's connection is closed, so is the channel - connection.close(closeTimeout); - } catch (IOException e) { - LOGGER.error("Cannot close connection", e); - } - } - - shutdownSharedExecutor(closeTimeout); - } - - public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException { - ChannelHolder holder = getChannelFor(PinId.forRoutingKey(routingKey)); - holder.withLock(channel -> { - channel.basicPublish(exchange, routingKey, props, body); - }); - } - - public SubscriberMonitor basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { - ChannelHolder holder = getChannelFor(PinId.forQueue(queue)); - String tag = holder.mapWithLock(channel -> { - return channel.basicConsume(queue, false, subscriberName + "_" + nextSubscriberId.getAndIncrement(), (tagTmp, delivery) -> { - try { - try { - deliverCallback.handle(tagTmp, delivery); - } finally { - holder.withLock(ch -> basicAck(ch, delivery.getEnvelope().getDeliveryTag())); - } - } catch (IOException | RuntimeException e) { - LOGGER.error("Cannot handle delivery for tag {}: {}", tagTmp, e.getMessage(), e); - } - }, cancelCallback); - }); - - return new RabbitMqSubscriberMonitor(holder, tag, this::basicCancel); - } - - private void basicCancel(Channel channel, String consumerTag) throws IOException { - channel.basicCancel(consumerTag); - } - - private void shutdownSharedExecutor(int closeTimeout) { - sharedExecutor.shutdown(); - try { - if (!sharedExecutor.awaitTermination(closeTimeout, TimeUnit.MILLISECONDS)) { - LOGGER.error("Executor is not terminated during {} millis", closeTimeout); - List runnables = sharedExecutor.shutdownNow(); - LOGGER.error("{} task(s) was(were) not finished", runnables.size()); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - private ChannelHolder getChannelFor(PinId pinId) { - return channelsByPin.computeIfAbsent(pinId, ignore -> { - LOGGER.trace("Creating channel holder for {}", pinId); - return new ChannelHolder(this::createChannel, this::waitForConnectionRecovery); - }); - } - - private Channel createChannel() { - waitForConnectionRecovery(connection); - - try { - Channel channel = connection.createChannel(); - Objects.requireNonNull(channel, () -> "No channels are available in the connection. Max channel number: " + connection.getChannelMax()); - channel.basicQos(configuration.getPrefetchCount()); - channel.addReturnListener(ret -> - LOGGER.warn("Can not router message to exchange '{}', routing key '{}'. Reply code '{}' and text = {}", ret.getExchange(), ret.getRoutingKey(), ret.getReplyCode(), ret.getReplyText())); - return channel; - } catch (IOException e) { - throw new IllegalStateException("Can not create channel", e); - } - } - - private void waitForConnectionRecovery(ShutdownNotifier notifier) { - waitForConnectionRecovery(notifier, true); - } - - private void waitForConnectionRecovery(ShutdownNotifier notifier, boolean waitForRecovery) { - if (isConnectionRecovery(notifier)) { - if (waitForRecovery) { - waitForRecovery(notifier); - } else { - LOGGER.warn("Skip waiting for connection recovery"); - } - } - - if (connectionIsClosed.get()) { - throw new IllegalStateException("Connection is already closed"); - } - } - - private void waitForRecovery(ShutdownNotifier notifier) { - LOGGER.warn("Start waiting for connection recovery"); - while (isConnectionRecovery(notifier)) { - try { - Thread.sleep(1); - } catch (InterruptedException e) { - LOGGER.error("Wait for connection recovery was interrupted", e); - break; - } - } - LOGGER.info("Stop waiting for connection recovery"); - } - - private boolean isConnectionRecovery(ShutdownNotifier notifier) { - return !notifier.isOpen() && !connectionIsClosed.get(); - } - - /** - * @param channel pass channel witch used for basicConsume, because delivery tags are scoped per channel, - * deliveries must be acknowledged on the same channel they were received on. - * @throws IOException - */ - private void basicAck(Channel channel, long deliveryTag) throws IOException { - channel.basicAck(deliveryTag, false); - } - - private static class RabbitMqSubscriberMonitor implements SubscriberMonitor { - - private final ChannelHolder holder; - private final String tag; - private final CancelAction action; - - public RabbitMqSubscriberMonitor(ChannelHolder holder, String tag, - CancelAction action) { - this.holder = holder; - this.tag = tag; - this.action = action; - } - - @Override - public void unsubscribe() throws Exception { - holder.withLock(false, channel -> action.execute(channel, tag)); - } - } - - private interface CancelAction { - void execute(Channel channel, String tag) throws IOException; - } - - private static class PinId { - private final String routingKey; - private final String queue; - - public static PinId forRoutingKey(String routingKey) { - return new PinId(routingKey, null); - } - - public static PinId forQueue(String queue) { - return new PinId(null, queue); - } - - private PinId(String routingKey, String queue) { - if (routingKey == null && queue == null) { - throw new NullPointerException("Either routingKey or queue must be set"); - } - this.routingKey = routingKey; - this.queue = queue; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - - if (o == null || getClass() != o.getClass()) return false; - - PinId pinId = (PinId) o; - - return new EqualsBuilder().append(routingKey, pinId.routingKey).append(queue, pinId.queue).isEquals(); - } - - @Override - public int hashCode() { - return new HashCodeBuilder(17, 37).append(routingKey).append(queue).toHashCode(); - } - - @Override - public String toString() { - return new ToStringBuilder(this, ToStringStyle.JSON_STYLE) - .append("routingKey", routingKey) - .append("queue", queue) - .toString(); - } - } - - private static class ChannelHolder { - private final Lock lock = new ReentrantLock(); - private final Supplier supplier; - private final BiConsumer reconnectionChecker; - private Channel channel; - - public ChannelHolder( - Supplier supplier, - BiConsumer reconnectionChecker - ) { - this.supplier = Objects.requireNonNull(supplier, "'Supplier' parameter"); - this.reconnectionChecker = Objects.requireNonNull(reconnectionChecker, "'Reconnection checker' parameter"); - } - - public void withLock(ChannelConsumer consumer) throws IOException { - withLock(true, consumer); - } - - public void withLock(boolean waitForRecovery, ChannelConsumer consumer) throws IOException { - lock.lock(); - try { - consumer.consume(getChannel(waitForRecovery)); - } finally { - lock.unlock(); - } - } - - public T mapWithLock(ChannelMapper mapper) throws IOException { - lock.lock(); - try { - return mapper.map(getChannel()); - } finally { - lock.unlock(); - } - } - private Channel getChannel() { - return getChannel(true); - } - - - private Channel getChannel(boolean waitForRecovery) { - if (channel == null) { - channel = supplier.get(); - } - reconnectionChecker.accept(channel, waitForRecovery); - return channel; - } - } - - private interface ChannelMapper { - T map(Channel channel) throws IOException; - } - - private interface ChannelConsumer { - void consume(Channel channel) throws IOException; - } -} diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt index d7d60395c..51dc3baf1 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/RabbitMQConfiguration.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -26,7 +26,8 @@ data class RabbitMQConfiguration( @JsonProperty(required = true) var password: String, @Deprecated(message = "Please use subscriber name from ConnectionManagerConfiguration") var subscriberName: String? = null, //FIXME: Remove in future version - var exchangeName: String? = null) : Configuration() + var exchangeName: String? = null, +) : Configuration() data class ConnectionManagerConfiguration( var subscriberName: String? = null, @@ -36,5 +37,7 @@ data class ConnectionManagerConfiguration( var minConnectionRecoveryTimeout: Int = 10000, var maxConnectionRecoveryTimeout: Int = 60000, val prefetchCount: Int = 10, - val messageRecursionLimit: Int = 100 + val messageRecursionLimit: Int = 100, + var virtualPublishLimit: Long = 10000, + val secondsToCheckVirtualPublishLimit: Int = 10, ) : Configuration() \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.kt new file mode 100644 index 000000000..5de51efa1 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.kt @@ -0,0 +1,525 @@ +/* + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.common.schema.message.impl.rabbitmq.connection + +import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration +import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration +import java.lang.Runnable +import java.lang.AutoCloseable +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import com.exactpro.th2.common.metrics.HealthMetrics +import com.rabbitmq.client.RecoveryListener +import com.rabbitmq.client.Recoverable +import com.rabbitmq.client.ConnectionFactory +import com.rabbitmq.client.TopologyRecoveryException +import java.lang.IllegalStateException +import com.rabbitmq.client.RecoveryDelayHandler +import com.rabbitmq.http.client.ClientParameters +import java.io.IOException +import java.net.URISyntaxException +import com.rabbitmq.client.BlockedListener +import kotlin.Throws +import java.util.concurrent.TimeUnit +import com.rabbitmq.client.AMQP +import com.rabbitmq.client.DeliverCallback +import com.rabbitmq.client.CancelCallback +import com.exactpro.th2.common.schema.message.SubscriberMonitor +import java.lang.RuntimeException +import java.lang.InterruptedException +import java.util.function.BiConsumer +import com.rabbitmq.client.ShutdownNotifier +import com.google.common.util.concurrent.ThreadFactoryBuilder +import com.rabbitmq.client.Channel +import com.rabbitmq.client.Connection +import com.rabbitmq.client.Consumer +import com.rabbitmq.client.ExceptionHandler +import com.rabbitmq.http.client.Client +import com.rabbitmq.http.client.domain.DestinationType +import mu.KotlinLogging +import org.apache.commons.lang3.builder.EqualsBuilder +import org.apache.commons.lang3.builder.HashCodeBuilder +import org.apache.commons.lang3.builder.ToStringBuilder +import org.apache.commons.lang3.builder.ToStringStyle +import java.lang.NullPointerException +import java.util.Objects +import java.util.concurrent.TimeoutException +import java.util.concurrent.locks.Lock +import java.util.concurrent.locks.ReentrantLock +import java.util.function.Supplier + +class ConnectionManager( + private val rabbitMQConfiguration: RabbitMQConfiguration, + val connectionManagerConfiguration: ConnectionManagerConfiguration, + onFailedRecoveryConnection: Runnable?, +) : AutoCloseable { + private val connection: Connection + private val channelsByPin: MutableMap = ConcurrentHashMap() + private val connectionRecoveryAttempts = AtomicInteger(0) + private val connectionIsClosed = AtomicBoolean(false) + + private val subscriberName: String + private val nextSubscriberId = AtomicInteger(1) + private val sharedExecutor = Executors.newSingleThreadExecutor(ThreadFactoryBuilder() + .setNameFormat("rabbitmq-shared-pool-%d") + .build()) + private val client: Client + private val sizeCheckExecutor = Executors.newScheduledThreadPool(1) + private val knownExchangesToRoutingKeys: MutableMap> = ConcurrentHashMap() + private val metrics = HealthMetrics(this) + private val recoveryListener: RecoveryListener = object : RecoveryListener { + override fun handleRecovery(recoverable: Recoverable) { + LOGGER.debug("Count tries to recovery connection reset to 0") + connectionRecoveryAttempts.set(0) + metrics.readinessMonitor.enable() + LOGGER.debug("Set RabbitMQ readiness to true") + } + + override fun handleRecoveryStarted(recoverable: Recoverable) {} + } + + init { + val subscriberNameTmp: String? = connectionManagerConfiguration.subscriberName + ?: rabbitMQConfiguration.subscriberName + if (subscriberNameTmp == null || subscriberNameTmp.isBlank()) { + subscriberName = "rabbit_mq_subscriber." + System.currentTimeMillis() + LOGGER.info("Subscribers will use default name: {}", subscriberName) + } else { + subscriberName = subscriberNameTmp + "." + System.currentTimeMillis() + } + val factory = ConnectionFactory().apply { + host = rabbitMQConfiguration.host + port = rabbitMQConfiguration.port + } + val virtualHost = rabbitMQConfiguration.vHost + if (virtualHost.isNotBlank()) { + factory.virtualHost = virtualHost + } + val username = rabbitMQConfiguration.username + if (username.isNotBlank()) { + factory.username = username + } + val password = rabbitMQConfiguration.password + if (password.isNotBlank()) { + factory.password = password + } + if (connectionManagerConfiguration.connectionTimeout > 0) { + factory.connectionTimeout = connectionManagerConfiguration.connectionTimeout + } + factory.exceptionHandler = object : ExceptionHandler { + override fun handleUnexpectedConnectionDriverException(conn: Connection, exception: Throwable) { + turnOffReadiness(exception) + } + + override fun handleReturnListenerException(channel: Channel, exception: Throwable) { + turnOffReadiness(exception) + } + + override fun handleConfirmListenerException(channel: Channel, exception: Throwable) { + turnOffReadiness(exception) + } + + override fun handleBlockedListenerException(connection: Connection, exception: Throwable) { + turnOffReadiness(exception) + } + + override fun handleConsumerException( + channel: Channel, + exception: Throwable, + consumer: Consumer, + consumerTag: String, + methodName: String, + ) { + turnOffReadiness(exception) + } + + override fun handleConnectionRecoveryException(conn: Connection, exception: Throwable) { + turnOffReadiness(exception) + } + + override fun handleChannelRecoveryException(ch: Channel, exception: Throwable) { + turnOffReadiness(exception) + } + + override fun handleTopologyRecoveryException( + conn: Connection, + ch: Channel, + exception: TopologyRecoveryException, + ) { + turnOffReadiness(exception) + } + + private fun turnOffReadiness(exception: Throwable) { + metrics.readinessMonitor.disable() + LOGGER.debug("Set RabbitMQ readiness to false. RabbitMQ error", exception) + } + } + factory.isAutomaticRecoveryEnabled = true + factory.setConnectionRecoveryTriggeringCondition { + if (connectionIsClosed.get()) { + return@setConnectionRecoveryTriggeringCondition false + } + val tmpCountTriesToRecovery = connectionRecoveryAttempts.get() + if (tmpCountTriesToRecovery < connectionManagerConfiguration.maxRecoveryAttempts) { + LOGGER.info("Try to recovery connection to RabbitMQ. Count tries = {}", tmpCountTriesToRecovery + 1) + return@setConnectionRecoveryTriggeringCondition true + } + LOGGER.error("Can not connect to RabbitMQ. Count tries = {}", tmpCountTriesToRecovery) + if (onFailedRecoveryConnection != null) { + onFailedRecoveryConnection.run() + } else { + // TODO: we should stop the execution of the application. Don't use System.exit!!! + throw IllegalStateException("Cannot recover connection to RabbitMQ") + } + false + } + factory.recoveryDelayHandler = RecoveryDelayHandler { + val tmpCountTriesToRecovery = connectionRecoveryAttempts.getAndIncrement() + val recoveryDelay = (connectionManagerConfiguration.minConnectionRecoveryTimeout + + if (connectionManagerConfiguration.maxRecoveryAttempts > 1) (connectionManagerConfiguration.maxConnectionRecoveryTimeout - connectionManagerConfiguration.minConnectionRecoveryTimeout) + / (connectionManagerConfiguration.maxRecoveryAttempts - 1) + * tmpCountTriesToRecovery else 0) + LOGGER.info("Recovery delay for '{}' try = {}", tmpCountTriesToRecovery, recoveryDelay) + recoveryDelay.toLong() + } + factory.setSharedExecutor(sharedExecutor) + try { + connection = factory.newConnection() + client = Client( + ClientParameters() + .url(String.format(RABBITMQ_MANAGEMENT_URL, rabbitMQConfiguration.host)) + .username(rabbitMQConfiguration.username) + .password(rabbitMQConfiguration.password) + ) + metrics.readinessMonitor.enable() + LOGGER.debug("Set RabbitMQ readiness to true") + } catch (e: IOException) { + metrics.readinessMonitor.disable() + LOGGER.debug("Set RabbitMQ readiness to false. Can not create connection", e) + throw IllegalStateException("Failed to create RabbitMQ connection using configuration", e) + } catch (e: TimeoutException) { + metrics.readinessMonitor.disable() + LOGGER.debug("Set RabbitMQ readiness to false. Can not create connection", e) + throw IllegalStateException("Failed to create RabbitMQ connection using configuration", e) + } catch (e: URISyntaxException) { + metrics.readinessMonitor.disable() + LOGGER.debug("Set RabbitMQ readiness to false. Can not create connection", e) + throw IllegalStateException("Failed to create RabbitMQ connection using configuration", e) + } + connection.addBlockedListener(object : BlockedListener { + @Throws(IOException::class) + override fun handleBlocked(reason: String) { + LOGGER.warn("RabbitMQ blocked connection: {}", reason) + } + + @Throws(IOException::class) + override fun handleUnblocked() { + LOGGER.warn("RabbitMQ unblocked connection") + } + }) + if (connection is Recoverable) { + (connection as Recoverable).apply { + addRecoveryListener(recoveryListener) + } + LOGGER.debug("Recovery listener was added to connection.") + } else { + throw IllegalStateException("Connection does not implement Recoverable. Can not add RecoveryListener to it") + } + sizeCheckExecutor.scheduleAtFixedRate( + ::internalLockSendingIfSizeLimitExceeded, + connectionManagerConfiguration.secondsToCheckVirtualPublishLimit.toLong(), // TODO another initial delay? + connectionManagerConfiguration.secondsToCheckVirtualPublishLimit.toLong(), + TimeUnit.SECONDS + ) + } + + override fun close() { + if (connectionIsClosed.getAndSet(true)) { + return + } + val closeTimeout = connectionManagerConfiguration.connectionCloseTimeout + if (connection.isOpen) { + try { + // We close the connection and don't close channels + // because when a channel's connection is closed, so is the channel + connection.close(closeTimeout) + } catch (e: IOException) { + LOGGER.error("Cannot close connection", e) + } + } + shutdownExecutor(sharedExecutor, closeTimeout) + shutdownExecutor(sizeCheckExecutor, closeTimeout) + } + + @Throws(IOException::class) + fun basicPublish(exchange: String, routingKey: String, props: AMQP.BasicProperties?, body: ByteArray?) { + knownExchangesToRoutingKeys.computeIfAbsent(exchange) { mutableSetOf() }.add(routingKey) + getChannelFor(PinId.forRoutingKey(routingKey)).publishWithLocks(exchange, routingKey, props, body) + } + + @Throws(IOException::class) + fun basicConsume( + queue: String, + deliverCallback: DeliverCallback, + cancelCallback: CancelCallback?, + ): SubscriberMonitor { + val holder = getChannelFor(PinId.forQueue(queue)) + val tag = holder.mapWithLock { + it.basicConsume( + queue, + false, + subscriberName + "_" + nextSubscriberId.getAndIncrement(), + { tagTmp, delivery -> + try { + try { + deliverCallback.handle(tagTmp, delivery) + } finally { + holder.withLock { channel -> + basicAck(channel, delivery.envelope.deliveryTag) + } + } + } catch (e: IOException) { + LOGGER.error("Cannot handle delivery for tag {}: {}", tagTmp, e.message, e) + } catch (e: RuntimeException) { + LOGGER.error("Cannot handle delivery for tag {}: {}", tagTmp, e.message, e) + } + }, + cancelCallback + ) + } + return SubscriberMonitor { holder.withLock(false) { it.basicCancel(tag) } } + } + + private fun shutdownExecutor(executor: ExecutorService, closeTimeout: Int) { + executor.shutdown() + try { + if (!executor.awaitTermination(closeTimeout.toLong(), TimeUnit.MILLISECONDS)) { + LOGGER.error("Executor is not terminated during {} millis", closeTimeout) + val runnables = executor.shutdownNow() + LOGGER.error("{} task(s) was(were) not finished", runnables.size) + } + } catch (e: InterruptedException) { + Thread.currentThread().interrupt() + } + } + + private fun getChannelFor(pinId: PinId) = channelsByPin.computeIfAbsent(pinId) { + LOGGER.trace("Creating channel holder for {}", pinId) + ChannelHolder(::createChannel, ::waitForConnectionRecovery) + } + + private fun createChannel(): Channel { + waitForConnectionRecovery(connection) + return try { + val channel = connection.createChannel() + Objects.requireNonNull(channel) { "No channels are available in the connection. Max channel number: " + connection.channelMax } + channel.basicQos(connectionManagerConfiguration.prefetchCount) + channel.addReturnListener { ret -> + LOGGER.warn("Can not router message to exchange '{}', routing key '{}'. Reply code '{}' and text = {}", + ret.exchange, + ret.routingKey, + ret.replyCode, + ret.replyText) + } + channel + } catch (e: IOException) { + throw IllegalStateException("Can not create channel", e) + } + } + + private fun waitForConnectionRecovery(notifier: ShutdownNotifier, waitForRecovery: Boolean = true) { + if (isConnectionRecovery(notifier)) { + if (waitForRecovery) { + waitForRecovery(notifier) + } else { + LOGGER.warn("Skip waiting for connection recovery") + } + } + check(!connectionIsClosed.get()) { "Connection is already closed" } + } + + private fun waitForRecovery(notifier: ShutdownNotifier) { + LOGGER.warn("Start waiting for connection recovery") + while (isConnectionRecovery(notifier)) { + try { + Thread.sleep(1) + } catch (e: InterruptedException) { + LOGGER.error("Wait for connection recovery was interrupted", e) + break + } + } + LOGGER.info("Stop waiting for connection recovery") + } + + private fun isConnectionRecovery(notifier: ShutdownNotifier): Boolean { + return !notifier.isOpen && !connectionIsClosed.get() + } + + /** + * @param channel pass channel witch used for basicConsume, because delivery tags are scoped per channel, + * deliveries must be acknowledged on the same channel they were received on. + * @throws IOException + */ + @Throws(IOException::class) + private fun basicAck(channel: Channel, deliveryTag: Long) { + channel.basicAck(deliveryTag, false) + } + + fun lockSendingIfSizeLimitExceeded() { + sizeCheckExecutor.submit(::internalLockSendingIfSizeLimitExceeded).get() + } + + private fun internalLockSendingIfSizeLimitExceeded() = try { + val queueNameToSize = client.queues + .associateBy({ it.name }, { it.totalMessages }) + knownExchangesToRoutingKeys.entries.asSequence() + .flatMap { (exchange, routingKeys) -> + client.getBindingsBySource(rabbitMQConfiguration.vHost, exchange).asSequence() + .filter { it.destinationType == DestinationType.QUEUE && routingKeys.contains(it.routingKey) } + } + .groupBy { it.routingKey } + .forEach { (routingKey, bindings) -> + val bindingNameToSize = bindings + .associateBy({ it.destination }, { queueNameToSize.getValue(it.destination) }) + val limit = connectionManagerConfiguration.virtualPublishLimit + val holder = getChannelFor(PinId.forRoutingKey(routingKey)) + LOGGER.trace { "Size limit lock for routing key '$routingKey': ${holder.sizeLimitLock}" } + val sizeDetails = { + bindingNameToSize.entries + .joinToString { "${it.value} message(s) in '${it.key}'" } + } + if (bindingNameToSize.values.sum() > limit) { + if (!holder.sizeLimitLock.isLocked) { + holder.sizeLimitLock.lock() + LOGGER.info { "Sending via routing key '$routingKey' is paused because there are ${sizeDetails()}. Virtual publish limit is $limit" } + } + } else { + if (holder.sizeLimitLock.isLocked) { + holder.sizeLimitLock.unlock() + LOGGER.info { "Sending via routing key '$routingKey' is resumed. There are ${sizeDetails()}. Virtual publish limit is $limit" } + } + } + } + } catch (t: Throwable) { + LOGGER.error("Error during check queue sizes", t) + } + + private class PinId private constructor(routingKey: String?, queue: String?) { + private val routingKey: String? + private val queue: String? + + init { + if (routingKey == null && queue == null) { + throw NullPointerException("Either routingKey or queue must be set") + } + this.routingKey = routingKey + this.queue = queue + } + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other == null || javaClass != other.javaClass) return false + val pinId = other as PinId + return EqualsBuilder() + .append(routingKey, pinId.routingKey) + .append(queue, pinId.queue) + .isEquals + } + + override fun hashCode(): Int = HashCodeBuilder(17, 37) + .append(routingKey) + .append(queue) + .toHashCode() + + override fun toString(): String = ToStringBuilder(this, ToStringStyle.JSON_STYLE) + .append("routingKey", routingKey) + .append("queue", queue) + .toString() + + companion object { + fun forRoutingKey(routingKey: String): PinId { + return PinId(routingKey, null) + } + + fun forQueue(queue: String): PinId { + return PinId(null, queue) + } + } + } + + private class ChannelHolder( + private val supplier: Supplier, + private val reconnectionChecker: BiConsumer, + ) { + val sizeLimitLock = ReentrantLock() + private val lock: Lock = ReentrantLock() + private var channel: Channel? = null + + @Throws(IOException::class) + fun publishWithLocks(exchange: String, routingKey: String, props: AMQP.BasicProperties?, body: ByteArray?) { + sizeLimitLock.lock() + try { + withLock(true) { it.basicPublish(exchange, routingKey, props, body) } + } finally { + sizeLimitLock.unlock() + } + } + + @Throws(IOException::class) + fun withLock(consumer: (Channel) -> Unit) { + withLock(true, consumer) + } + + @Throws(IOException::class) + fun withLock(waitForRecovery: Boolean, consumer: (Channel) -> Unit) { + lock.lock() + try { + consumer(getChannel(waitForRecovery)) + } finally { + lock.unlock() + } + } + + @Throws(IOException::class) + fun mapWithLock(mapper: (Channel) -> T): T { + lock.lock() + return try { + mapper(getChannel()) + } finally { + lock.unlock() + } + } + + private fun getChannel(): Channel { + return getChannel(true) + } + + private fun getChannel(waitForRecovery: Boolean): Channel { + if (channel == null) { + channel = supplier.get() + } + reconnectionChecker.accept(channel!!, waitForRecovery) + return channel!! + } + } + + companion object { + private val LOGGER = KotlinLogging.logger {} + private const val RABBITMQ_MANAGEMENT_URL = "http://%s:15672/api/" + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchRouter.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchRouter.kt index d1baf2502..7251866e7 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchRouter.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchRouter.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -85,7 +85,7 @@ class RabbitMessageGroupBatchRouter : AbstractRabbitRouter() FilterFunction { msg: Message, filters: List -> filterMessage(msg, filters) }, pinName, pinConfig.filters, - connectionManager.configuration.messageRecursionLimit + connectionManager.connectionManagerConfiguration.messageRecursionLimit ) } diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/TestRabbitMessageGroupBatchRouter.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/TestRabbitMessageGroupBatchRouter.kt index 4b48e4510..973bb4f87 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/TestRabbitMessageGroupBatchRouter.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/TestRabbitMessageGroupBatchRouter.kt @@ -1,5 +1,5 @@ /* - * Copyright 2021 Exactpro (Exactpro Systems Limited) + * Copyright 2021-2022 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,7 +47,7 @@ class TestRabbitMessageGroupBatchRouter { private val connectionConfiguration = ConnectionManagerConfiguration() private val monitor: SubscriberMonitor = mock { } private val connectionManager: ConnectionManager = mock { - on { configuration }.thenReturn(connectionConfiguration) + on { connectionManagerConfiguration }.thenReturn(connectionConfiguration) on { basicConsume(any(), any(), any()) }.thenReturn(monitor) } diff --git a/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 000000000..ca6ee9cea --- /dev/null +++ b/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline \ No newline at end of file