Skip to content

Commit

Permalink
fix(consumers): fix dlq configuration - introduce $SYS namespace for …
Browse files Browse the repository at this point in the history
…internal use

Signed-off-by: riccardomodanese <[email protected]>
  • Loading branch information
riccardomodanese committed Oct 23, 2023
1 parent 390a307 commit 1a0f8d9
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 61 deletions.
21 changes: 8 additions & 13 deletions assembly/broker-artemis/configurations/broker.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>dlq</dead-letter-address>
<expiry-address>expired</expiry-address>
<dead-letter-address>$SYS/dlq</dead-letter-address>
<expiry-address>$SYS/expired</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
Expand All @@ -132,8 +132,8 @@
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>dlq</dead-letter-address>
<expiry-address>expired</expiry-address>
<dead-letter-address>$SYS/dlq</dead-letter-address>
<expiry-address>$SYS/expired</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
Expand Down Expand Up @@ -166,19 +166,14 @@
</broker-plugin>
</broker-plugins>
<addresses>
<address name="dlq">
<address name="$SYS/dlq">
<anycast>
<queue name="dlq"/>
<queue name="$SYS/dlq"/>
</anycast>
</address>
<address name="authDlq">
<address name="$SYS/expired">
<anycast>
<queue name="authDlq"/>
</anycast>
</address>
<address name="expired">
<anycast>
<queue name="expired"/>
<queue name="$SYS/expired"/>
</anycast>
</address>
</addresses>
Expand Down
27 changes: 6 additions & 21 deletions assembly/events-broker/configurations/broker.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,25 +111,10 @@
</security-settings>

<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
<default-address-routing-type>ANYCAST</default-address-routing-type>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<dead-letter-address>dlq</dead-letter-address>
<expiry-address>expired</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
Expand All @@ -144,14 +129,14 @@
</address-settings>

<addresses>
<address name="DLQ">
<address name="dlq">
<anycast>
<queue name="DLQ"/>
<queue name="dlq"/>
</anycast>
</address>
<address name="ExpiryQueue">
<address name="expired">
<anycast>
<queue name="ExpiryQueue"/>
<queue name="expired"/>
</anycast>
</address>
</addresses>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ enum Failure {

public static enum MessageType {

ActiveMq("AMQ"),
Broker("BRK"),
Control("CTR"),
Telemetry("TEL");
Telemetry("TEL"),
System("SYS");

private String asUrl;

Expand Down Expand Up @@ -245,10 +246,15 @@ private boolean isLwt(String originalTopic) {
private String getMessgeType(String address) {
if (address!=null) {
if (address.startsWith("active")) {
return MessageType.ActiveMq.getAsUrl();
return MessageType.Broker.getAsUrl();
}
else if (address.startsWith("$")) {
return MessageType.Control.getAsUrl();
if (address.startsWith("$SYS")) {
return MessageType.System.getAsUrl();
}
else {
return MessageType.Control.getAsUrl();
}
}
else {
return MessageType.Telemetry.getAsUrl();
Expand Down
2 changes: 1 addition & 1 deletion consumer/lifecycle-app/src/main/resources/camel/camel.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
</pipeline>
</route>
<route id="dlq">
<from uri="amqp:queue:dlq?asyncConsumer=false&amp;acknowledgementModeName=CLIENT_ACKNOWLEDGE&amp;transacted=false&amp;concurrentConsumers=2&amp;maxConcurrentConsumers=5"/>
<from uri="amqp:queue:$SYS/dlq?selector=KAPUA_MESSAGE_TYPE='SYS'&amp;asyncConsumer=false&amp;acknowledgementModeName=CLIENT_ACKNOWLEDGE&amp;transacted=false&amp;concurrentConsumers=2&amp;maxConcurrentConsumers=5"/>
<pipeline>
<bean ref="kapuaCamelFilter" method="bindSession"/>
<to uri="bean:errorMessageListener?method=processMessage"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
<!-- exception/error handler -->
<camel:errorHandler id="mainRouteMessageErrorHandler" redeliveryPolicyRef="kapuaRedeliveryPolicy"
type="DeadLetterChannel"
deadLetterUri="amqp:queue:dlq"
deadLetterUri="amqp:queue:$SYS/dlq"
useOriginalMessage="true">
</camel:errorHandler>
<camel:redeliveryPolicyProfile id="kapuaRedeliveryPolicy" maximumRedeliveries="0" redeliveryDelay="0" retryAttemptedLogLevel="WARN" logRetryAttempted="true"/>
Expand Down
2 changes: 1 addition & 1 deletion consumer/telemetry-app/src/main/resources/camel/camel.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
</pipeline>
</route>
<route id="dlq">
<from uri="amqp:queue:dlq?asyncConsumer=true&amp;acknowledgementModeName=CLIENT_ACKNOWLEDGE&amp;transacted=false&amp;concurrentConsumers=2&amp;maxConcurrentConsumers=5"/>
<from uri="amqp:queue:$SYS/dlq?selector=KAPUA_MESSAGE_TYPE='SYS'&amp;asyncConsumer=true&amp;acknowledgementModeName=CLIENT_ACKNOWLEDGE&amp;transacted=false&amp;concurrentConsumers=2&amp;maxConcurrentConsumers=5"/>
<pipeline>
<bean ref="kapuaCamelFilter" method="bindSession"/>
<to uri="bean:errorMessageListener?method=processMessage"/>
Expand Down
10 changes: 8 additions & 2 deletions service/authentication-app/src/main/resources/camel/camel.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
-->
<routes xmlns="http://camel.apache.org/schema/spring">
<route errorHandlerRef="authRouteMessageErrorHandler" id="authRoute">
<from uri="amqp:queue://auth_request?asyncConsumer=false&amp;acknowledgementModeName=CLIENT_ACKNOWLEDGE&amp;transacted=false&amp;concurrentConsumers=2&amp;maxConcurrentConsumers=5"/>
<from uri="amqp:queue:auth_request?asyncConsumer=false&amp;acknowledgementModeName=CLIENT_ACKNOWLEDGE&amp;transacted=false&amp;concurrentConsumers=2&amp;maxConcurrentConsumers=5"/>
<pipeline>
<choice id="main">
<when id="doLogin">
Expand All @@ -36,7 +36,13 @@
</pipeline>
</route>
<route id="authDlq">
<from uri="amqp:queue:authDlq?asyncConsumer=false&amp;acknowledgementModeName=CLIENT_ACKNOWLEDGE&amp;transacted=false&amp;concurrentConsumers=1&amp;maxConcurrentConsumers=2"/>
<from uri="amqp:queue:dlq?asyncConsumer=false&amp;acknowledgementModeName=CLIENT_ACKNOWLEDGE&amp;transacted=false&amp;concurrentConsumers=1&amp;maxConcurrentConsumers=2"/>
<pipeline>
<to uri="bean:errorMessageListener?method=processMessage"/>
</pipeline>
</route>
<route id="expired">
<from uri="amqp:queue:expired?asyncConsumer=false&amp;acknowledgementModeName=CLIENT_ACKNOWLEDGE&amp;transacted=false&amp;concurrentConsumers=1&amp;maxConcurrentConsumers=2"/>
<pipeline>
<to uri="bean:errorMessageListener?method=processMessage"/>
</pipeline>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.apache.camel.Processor;
import org.eclipse.kapua.KapuaUnauthenticatedException;
import org.eclipse.kapua.service.camel.application.MetricsCamel;
import org.eclipse.kapua.service.client.message.MessageConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,21 +28,6 @@ public class FailureProcessor implements Processor {

private static final Logger logger = LoggerFactory.getLogger(FailureProcessor.class);

public static enum MessageProcessResult {

Error("ERR");

private String asUrl;

MessageProcessResult(String asUrl) {
this.asUrl = asUrl;
}

public String getAsUrl() {
return asUrl;
}
}

//TODO inject!!!
private MetricsCamel metrics;

Expand All @@ -53,7 +37,6 @@ public FailureProcessor() {

@Override
public void process(Exchange exchange) throws Exception {
exchange.getIn().setHeader(MessageConstants.HEADER_KAPUA_PROCESS_RESULT, MessageProcessResult.Error.getAsUrl());
if (isUnauthenticatedException(exchange)) {
if (logger.isDebugEnabled()) {
logger.debug("Detected unauthenticated error on message processing retry!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ private MessageConstants() {
public static final String HEADER_KAPUA_BROKER_CONTEXT = "KAPUA_BROKER_CONTEXT";
public static final String HEADER_KAPUA_PROCESSING_EXCEPTION = "KAPUA_PROCESSING_EXCEPTION";
public static final String HEADER_KAPUA_MESSAGE_TYPE = "KAPUA_MESSAGE_TYPE";
public static final String HEADER_KAPUA_PROCESS_RESULT = "KAPUA_PROCESS_RESULT";

public static final String HEADER_CAMEL_JMS_HEADER_TIMESTAMP = "JMSTimestamp";
public static final String HEADER_CAMEL_JMS_HEADER_DESTINATION = "JMSDestination";
Expand Down

0 comments on commit 1a0f8d9

Please sign in to comment.