Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[th2-2552] backpressure: added check for queue size limit #184

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
43 changes: 24 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Then you will create an instance of imported class, by choosing one of the follo

### Configuration formats

The `CommonFactory` reads a RabbitMQ configuration from the rabbitMQ.json file.
The `CommonFactory` reads a RabbitMQ configuration from the `rabbitMQ.json` file.
* host - the required setting defines the RabbitMQ host.
* vHost - the required setting defines the virtual host that will be used for connecting to RabbitMQ.
Please see more details about the virtual host in RabbitMQ via [link](https://www.rabbitmq.com/vhosts.html)
Expand All @@ -72,16 +72,6 @@ The `CommonFactory` reads a RabbitMQ configuration from the rabbitMQ.json file.
* password - the required setting defines the password that will be used for connecting to RabbitMQ.
* exchangeName - the required setting defines the exchange that will be used for sending/subscribing operation in MQ routers.
Please see more details about the exchanges in RabbitMQ via [link](https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchanges)
* connectionTimeout - the connection TCP establishment timeout in milliseconds with its default value set to 60000. Use zero for infinite waiting.
* connectionCloseTimeout - the timeout in milliseconds for completing all the close-related operations, use -1 for infinity, the default value is set to 10000.
* maxRecoveryAttempts - this option defines the number of reconnection attempts to RabbitMQ, with its default value set to 5.
The `th2_readiness` probe is set to false and publishers are blocked after a lost connection to RabbitMQ. The `th2_readiness` probe is reverted to true if the connection will be recovered during specified attempts otherwise the `th2_liveness` probe will be set to false.
* minConnectionRecoveryTimeout - this option defines a minimal interval in milliseconds between reconnect attempts, with its default value set to 10000. Common factory increases the reconnect interval values from minConnectionRecoveryTimeout to maxConnectionRecoveryTimeout.
* maxConnectionRecoveryTimeout - this option defines a maximum interval in milliseconds between reconnect attempts, with its default value set to 60000. Common factory increases the reconnect interval values from minConnectionRecoveryTimeout to maxConnectionRecoveryTimeout.
* prefetchCount - this option is the maximum number of messages that the server will deliver, with its value set to 0 if unlimited, the default value is set to 10.
* messageRecursionLimit - an integer number denotes how deep nested protobuf message might be, default = 100
* secondsToCheckVirtualQueueLimit - this option defines an interval in seconds between size check attempts, default = 10
* batchesToCheckVirtualQueueLimit - this option defines the number of batches between size check attempts, default = 10000

```json
{
Expand All @@ -90,16 +80,34 @@ The `CommonFactory` reads a RabbitMQ configuration from the rabbitMQ.json file.
"port": 5672,
"username": "<user name>",
"password": "<password>",
"exchangeName": "<exchange name>",
"exchangeName": "<exchange name>"
}
```

The `CommonFactory` reads a RabbitMQ connection configuration from the `mq_router.json` file.
* subscriberName - the client-generated consumer tag to establish context, default = (`rabbit_mq_subscriber.` + current time in milliseconds)
* connectionTimeout - the connection TCP establishment timeout in milliseconds with its default value set to 60000. Use zero for infinite waiting.
* connectionCloseTimeout - the timeout in milliseconds for completing all the close-related operations, use -1 for infinity, the default value is set to 10000.
* maxRecoveryAttempts - this option defines the number of reconnection attempts to RabbitMQ, with its default value set to 5.
The `th2_readiness` probe is set to false and publishers are blocked after a lost connection to RabbitMQ. The `th2_readiness` probe is reverted to true if the connection will be recovered during specified attempts otherwise the `th2_liveness` probe will be set to false.
* minConnectionRecoveryTimeout - this option defines a minimal interval in milliseconds between reconnect attempts, with its default value set to 10000. Common factory increases the reconnect interval values from minConnectionRecoveryTimeout to maxConnectionRecoveryTimeout.
* maxConnectionRecoveryTimeout - this option defines a maximum interval in milliseconds between reconnect attempts, with its default value set to 60000. Common factory increases the reconnect interval values from minConnectionRecoveryTimeout to maxConnectionRecoveryTimeout.
* prefetchCount - this option is the maximum number of messages that the server will deliver, with its value set to 0 if unlimited, the default value is set to 10.
* messageRecursionLimit - an integer number denotes how deep nested protobuf message might be, default = 100
* virtualPublishLimit - MQ router calculates destination queues and compares their current size to this value. The router blocks the current thread to repeat the comparison if the size of any destination queues exceeds the virtual limit
* secondsToCheckVirtualPublishLimit - this option defines an interval in seconds between size check attempts, default = 10
```json
{
"subscriberName": "<subscriber name>",
"connectionTimeout": 60000,
"connectionCloseTimeout": 10000,
"maxRecoveryAttempts": 5,
"minConnectionRecoveryTimeout": 10000,
"maxConnectionRecoveryTimeout": 60000,
"prefetchCount": 10,
"messageRecursionLimit": 100,
"secondsToCheckVirtualQueueLimit": 10,
"batchesToCheckVirtualQueueLimit": 10000
"virtualPublishLimit": 10000,
"secondsToCheckVirtualPublishLimit": 10
}
```

Expand All @@ -121,7 +129,6 @@ The `CommonFactory` reads a message's router configuration from the `mq.json` fi
* filters - pin's message's filters
* metadata - a metadata filters
* message - a message's fields filters
* virtualQueueLimit - MQ router calculates destination queues and compares their current size to this value. The router blocks the current thread to repeat the comparison if the size of any destination queues exceeds the virtual limit

Filters format:
* fieldName - a field's name
Expand Down Expand Up @@ -159,8 +166,7 @@ Filters format:
"operation": "WILDCARD"
}
]
},
"virtualQueueLimit": 10000
}
}
}
}
Expand Down Expand Up @@ -297,8 +303,7 @@ dependencies {
### 3.34.0

+ Added backpressure support: lock sending if queue virtual size limit is exceeded
+ Added parameter `virtualQueueLimit` to `mq.json`
+ Added parameters `secondsToCheckVirtualQueueLimit` and `batchesToCheckVirtualQueueLimit` to `mq_router.json`
+ Added parameters `virtualPublishLimit` and `secondsToCheckVirtualPublishLimit` to `mq_router.json`

### 3.33.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,12 +645,7 @@ protected PrometheusConfiguration loadPrometheusConfiguration() {
}

protected ConnectionManager createRabbitMQConnectionManager() {
return new ConnectionManager(
getRabbitMqConfiguration(),
getConnectionManagerConfiguration(),
getMessageRouterConfiguration(),
livenessMonitor::disable
);
return new ConnectionManager(getRabbitMqConfiguration(), getConnectionManagerConfiguration(), livenessMonitor::disable);
}

protected ConnectionManager getRabbitMqConnectionManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ public void send(T value) throws IOException {
.labels(th2Pin, th2Type, exchangeName.get(), routingKey.get())
.inc();
sentBeforeQueueSizeCheck++;
if (sentBeforeQueueSizeCheck > connectionManager.getConnectionManagerConfiguration().getBatchesToCheckVirtualQueueLimit()) {
connectionManager.lockSendingIfSizeLimitExceeded(routingKey.get());
if (sentBeforeQueueSizeCheck > connectionManager.getConnectionManagerConfiguration().getVirtualPublishLimit()) {
connectionManager.lockSendingIfSizeLimitExceeded();
sentBeforeQueueSizeCheck = 0;
}
connectionManager.basicPublish(exchangeName.get(), routingKey.get(), null, bytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

import com.exactpro.th2.common.metrics.HealthMetrics;
import com.exactpro.th2.common.schema.message.SubscriberMonitor;
import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration;
import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand Down Expand Up @@ -52,7 +50,6 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -75,8 +72,8 @@

import static com.rabbitmq.http.client.domain.DestinationType.QUEUE;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.partitioningBy;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;

public class ConnectionManager implements AutoCloseable {
Expand All @@ -94,10 +91,9 @@ public class ConnectionManager implements AutoCloseable {
private final ExecutorService sharedExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat("rabbitmq-shared-pool-%d")
.build());
private final Map<String, Long> queueNameToVirtualQueueLimit;
private final Client client;
private final ScheduledExecutorService sizeCheckExecutor = Executors.newScheduledThreadPool(1);
private final Set<String> knownExchanges = Collections.synchronizedSet(new HashSet<>());
private final Map<String, Set<String>> knownExchangesToRoutingKeys = new ConcurrentHashMap<>();

private final HealthMetrics metrics = new HealthMetrics(this);

Expand All @@ -121,20 +117,10 @@ public ConnectionManagerConfiguration getConnectionManagerConfiguration() {
public ConnectionManager(
@NotNull RabbitMQConfiguration rabbitMQConfiguration,
@NotNull ConnectionManagerConfiguration connectionManagerConfiguration,
@NotNull MessageRouterConfiguration messageRouterConfiguration,
Runnable onFailedRecoveryConnection
) {
this.rabbitMQConfiguration = requireNonNull(rabbitMQConfiguration, "RabbitMQ configuration cannot be null");
this.connectionManagerConfiguration = requireNonNull(connectionManagerConfiguration, "Connection manager configuration can not be null");
queueNameToVirtualQueueLimit = requireNonNull(messageRouterConfiguration, "Message router configuration can not be null")
.getQueues()
.values()
.stream()
.collect(toMap(
QueueConfiguration::getQueue,
QueueConfiguration::getVirtualQueueLimit,
Math::min // TODO is it valid situation if there are several configurations for one queue?
));

String subscriberNameTmp = ObjectUtils.defaultIfNull(connectionManagerConfiguration.getSubscriberName(), rabbitMQConfiguration.getSubscriberName());
if (StringUtils.isBlank(subscriberNameTmp)) {
Expand Down Expand Up @@ -291,8 +277,8 @@ public void handleUnblocked() throws IOException {

sizeCheckExecutor.scheduleAtFixedRate(
this::lockSendingIfSizeLimitExceeded,
connectionManagerConfiguration.getSecondsToCheckVirtualQueueLimit(), // TODO another initial delay?
connectionManagerConfiguration.getSecondsToCheckVirtualQueueLimit(),
connectionManagerConfiguration.getSecondsToCheckVirtualPublishLimit(), // TODO another initial delay?
connectionManagerConfiguration.getSecondsToCheckVirtualPublishLimit(),
TimeUnit.SECONDS
);
}
Expand Down Expand Up @@ -323,7 +309,7 @@ public void close() {
}

public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException {
knownExchanges.add(exchange);
knownExchangesToRoutingKeys.computeIfAbsent(exchange, e -> new HashSet<>()).add(routingKey);
getChannelFor(PinId.forRoutingKey(routingKey)).publishWithLocks(exchange, routingKey, props, body);
}

Expand Down Expand Up @@ -429,74 +415,61 @@ private void basicAck(Channel channel, long deliveryTag) throws IOException {
channel.basicAck(deliveryTag, false);
}

public void lockSendingIfSizeLimitExceeded(String routingKey) {
lockSendingIfSizeLimitExceeded(List.of(routingKey));
}

private void lockSendingIfSizeLimitExceeded() {
lockSendingIfSizeLimitExceeded(
channelsByPin.keySet().stream()
.filter(channelHolder -> channelHolder.routingKey != null)
.map(channelHolder -> channelHolder.routingKey)
.collect(Collectors.toList())
);
}

private void lockSendingIfSizeLimitExceeded(List<String> knownRoutingKeys) {
public void lockSendingIfSizeLimitExceeded() {
try {
for (var routingKeyToQueues : groupQueuesByRoutingKey(knownRoutingKeys).entrySet()) {
String routingKey = routingKeyToQueues.getKey();
Map<Boolean, List<QueueInfoWithVirtualLimit>> isExceededToQueues = routingKeyToQueues.getValue().stream()
.collect(partitioningBy(QueueInfoWithVirtualLimit::isExceeded));
groupQueuesByRoutingKey().forEach((routingKey, queuesWithVirtualPublishLimit) -> {
ChannelHolder holder = getChannelFor(PinId.forRoutingKey(routingKey));
List<QueueInfoWithVirtualLimit> exceededQueues = isExceededToQueues.get(true);
if (exceededQueues.isEmpty()) {
if (holder.sizeLimitLock.isLocked()) {
holder.sizeLimitLock.unlock();
if (queuesWithVirtualPublishLimit.isExceeded()) {
if (!holder.sizeLimitLock.isLocked()) {
holder.sizeLimitLock.lock();
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
"Sending via routing key '{}' is resumed. There are {}",
"Sending via routing key '{}' is paused because there are {}",
routingKey,
isExceededToQueues.get(false).stream().map(QueueInfoWithVirtualLimit::getSizeDetails).collect(joining(", "))
queuesWithVirtualPublishLimit.getSizeDetails()
);
}
}
} else {
if (!holder.sizeLimitLock.isLocked()) {
holder.sizeLimitLock.lock();
if (holder.sizeLimitLock.isLocked()) {
holder.sizeLimitLock.unlock();
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
"Sending via routing key '{}' is paused because there are {}",
"Sending via routing key '{}' is resumed. There are {}",
routingKey,
exceededQueues.stream().map(QueueInfoWithVirtualLimit::getSizeDetails).collect(joining(", "))
queuesWithVirtualPublishLimit.getSizeDetails()
);
}
}
}
}
});
} catch (Throwable t) {
LOGGER.error("Error during check queue sizes", t);
}
}

private Map<String, List<QueueInfoWithVirtualLimit>> groupQueuesByRoutingKey(List<String> knownRoutingKeys) {
private Map<String, QueuesWithVirtualPublishLimit> groupQueuesByRoutingKey() {
List<BindingInfo> bindings = new ArrayList<>();
knownExchanges.forEach(exchange -> bindings.addAll(
knownExchangesToRoutingKeys.forEach((exchange, routingKeys) -> bindings.addAll(
client.getBindingsBySource(rabbitMQConfiguration.getVHost(), exchange).stream()
.filter(it -> it.getDestinationType() == QUEUE && knownRoutingKeys.contains(it.getRoutingKey()))
.filter(it -> it.getDestinationType() == QUEUE && routingKeys.contains(it.getRoutingKey()))
.collect(Collectors.toList())
));
Map<String, QueueInfo> queueNameToInfo = client.getQueues().stream()
.collect(toMap(QueueInfo::getName, Function.identity()));
Map<String, List<QueueInfoWithVirtualLimit>> routingKeyToQueues = new HashMap<>();
bindings.forEach(bindingInfo -> routingKeyToQueues
.computeIfAbsent(bindingInfo.getRoutingKey(), s -> new ArrayList<>())
.add(new QueueInfoWithVirtualLimit(
queueNameToInfo.get(bindingInfo.getDestination()),
queueNameToVirtualQueueLimit.get(bindingInfo.getDestination())
))
);
return routingKeyToQueues;
Map<String, QueuesWithVirtualPublishLimit> routingKeyToQueuesWithLimit = new HashMap<>();
bindings.stream()
.collect(groupingBy(BindingInfo::getRoutingKey))
.forEach((routingKey, bindingsForRoutingKey) ->
routingKeyToQueuesWithLimit.put(
routingKey,
new QueuesWithVirtualPublishLimit(
bindingsForRoutingKey.stream().map(bindingInfo -> queueNameToInfo.get(bindingInfo.getDestination())).collect(toList()),
connectionManagerConfiguration.getVirtualPublishLimit()
)
)
);
return routingKeyToQueuesWithLimit;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private Map<String, QueuesWithVirtualPublishLimit> groupQueuesByRoutingKey() {
Map<String, QueueInfo> queueNameToInfo = client.getQueues().stream()
.collect(toMap(QueueInfo::getName, Function.identity()));

    Map<String, List<BindingInfo>> bindings = knownExchangesToRoutingKeys.entrySet().stream()
            .flatMap(entry -> {
                        String exchange = entry.getKey();
                        Set<String> routingKeys = entry.getValue();
                        return client.getBindingsBySource(rabbitMQConfiguration.getVHost(), exchange).stream()
                                .filter(it -> it.getDestinationType() == QUEUE && routingKeys.contains(it.getRoutingKey()));
                    }
            ).collect(groupingBy(BindingInfo::getRoutingKey));

    Map<String, QueuesWithVirtualPublishLimit> routingKeyToQueuesWithLimit = new HashMap<>();
    bindings.forEach((routingKey, bindingsForRoutingKey) ->
                    routingKeyToQueuesWithLimit.put(
                            routingKey,
                            new QueuesWithVirtualPublishLimit(
                                    bindingsForRoutingKey.stream().map(bindingInfo -> queueNameToInfo.get(bindingInfo.getDestination())).collect(toList()),
                                    connectionManagerConfiguration.getVirtualPublishLimit()
                            )
                    )
            );
    return routingKeyToQueuesWithLimit;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can combine lockSendingIfSizeLimitExceeded and groupQueuesByRoutingKey into continuous stream or Kotlin sequence

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to do it. I wanted to keep all logic with com.rabbitmq.http.client.Client in one place and do only locking in lockSendingIfSizeLimitExceeded().
But I'll try if you want.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see commit 18469f3.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I migrated ConnectionManager to Kotlin. Please see commit fbae68d.
But I don't think we can do it in one stream because we should take all queues for each routing key. I mean we can't do anything before check all queues.


private static class RabbitMqSubscriberMonitor implements SubscriberMonitor {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -42,8 +42,7 @@ data class QueueConfiguration(
@JsonProperty(required = true) @JsonAlias("labels", "tags") var attributes: List<String> = emptyList(),
var filters: List<MqRouterFilterConfiguration> = emptyList(),
@JsonProperty(value = "read") var isReadable: Boolean = true,
@JsonProperty(value = "write") var isWritable: Boolean = true,
@JsonProperty var virtualQueueLimit: Long = 10000,
@JsonProperty(value = "write") var isWritable: Boolean = true
) : Configuration()

data class MqRouterFilterConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ data class ConnectionManagerConfiguration(
var maxConnectionRecoveryTimeout: Int = 60000,
val prefetchCount: Int = 10,
val messageRecursionLimit: Int = 100,
val secondsToCheckVirtualQueueLimit: Int = 10,
val batchesToCheckVirtualQueueLimit: Int = 10000,
var virtualPublishLimit: Long = 10000,
val secondsToCheckVirtualPublishLimit: Int = 10,
) : Configuration()
Loading