Skip to content

Commit

Permalink
fix(consumers): introduce $SYS prefix for events and services
Browse files Browse the repository at this point in the history
Signed-off-by: riccardomodanese <[email protected]>
  • Loading branch information
riccardomodanese committed Oct 23, 2023
1 parent 1a0f8d9 commit f43d610
Show file tree
Hide file tree
Showing 10 changed files with 26 additions and 42 deletions.
27 changes: 6 additions & 21 deletions assembly/broker-artemis/configurations/broker.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,25 +115,10 @@
</security-settings>

<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>$SYS/dlq</dead-letter-address>
<expiry-address>$SYS/expired</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
<default-address-routing-type>ANYCAST</default-address-routing-type>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>$SYS/dlq</dead-letter-address>
<expiry-address>$SYS/expired</expiry-address>
<dead-letter-address>$SYS/dlq/default</dead-letter-address>
<expiry-address>$SYS/dlq/expired</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
Expand Down Expand Up @@ -166,14 +151,14 @@
</broker-plugin>
</broker-plugins>
<addresses>
<address name="$SYS/dlq">
<address name="$SYS/dlq/default">
<anycast>
<queue name="$SYS/dlq"/>
<queue name="$SYS/dlq/default"/>
</anycast>
</address>
<address name="$SYS/expired">
<address name="$SYS/dlq/expired">
<anycast>
<queue name="$SYS/expired"/>
<queue name="$SYS/dlq/expired"/>
</anycast>
</address>
</addresses>
Expand Down
13 changes: 6 additions & 7 deletions assembly/events-broker/configurations/broker.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,9 @@
</security-settings>

<address-settings>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>dlq</dead-letter-address>
<expiry-address>expired</expiry-address>
<dead-letter-address>$SYS/svc/dlq/default</dead-letter-address>
<expiry-address>$SYS/svc/dlq/expired</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
Expand All @@ -129,14 +128,14 @@
</address-settings>

<addresses>
<address name="dlq">
<address name="$SYS/svc/dlq/default">
<anycast>
<queue name="dlq"/>
<queue name="$SYS/svc/dlq/default" />
</anycast>
</address>
<address name="expired">
<address name="$SYS/svc/dlq/expired">
<anycast>
<queue name="expired"/>
<queue name="$SYS/svc/dlq/expired" />
</anycast>
</address>
</addresses>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,16 @@ public class ServiceClientMessagingImpl implements ServiceClient {

private static final Logger logger = LoggerFactory.getLogger(ServiceClientMessagingImpl.class);

public static final String REQUEST_QUEUE = "auth_request";
public static final String RESPONSE_QUEUE_PATTERN = "auth_response_%s_%s";
public static final String REQUEST_QUEUE = "$SYS/svc/ath/request";
public static final String RESPONSE_QUEUE_PATTERN = "$SYS/svc/ath/response/%s_%s";

private static final int TIMEOUT = 5000;

private Client client;

public ServiceClientMessagingImpl(String clusterName, String requester) {
//TODO change configuration (use service event broker for now)
String clientId = "auth-" + UUID.randomUUID().toString();
String clientId = "svc-ath-" + UUID.randomUUID().toString();

Check warning on line 52 in client/security/src/main/java/org/eclipse/kapua/client/security/ServiceClientMessagingImpl.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/ServiceClientMessagingImpl.java#L52

Added line #L52 was not covered by tests
String host = SystemSetting.getInstance().getString(SystemSettingKey.SERVICE_BUS_HOST, "events-broker");
int port = SystemSetting.getInstance().getInt(SystemSettingKey.SERVICE_BUS_PORT, 5672);
String username = SystemSetting.getInstance().getString(SystemSettingKey.SERVICE_BUS_USERNAME, "username");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ void publish(String address, ServiceEvent kapuaEvent)
synchronized void subscribe(Subscription subscription)
throws ServiceEventBusException {
try {
String subscriptionStr = String.format("events.%s", subscription.getAddress());
String subscriptionStr = String.format("$SYS/svc/evt/%s", subscription.getAddress());

Check warning on line 302 in commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java

View check run for this annotation

Codecov / codecov/patch

commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java#L302

Added line #L302 was not covered by tests
// create a bunch of sessions to allow parallel event processing
LOGGER.info("Subscribing to address {} - name {} ...", subscriptionStr, subscription.getName());
for (int i = 0; i < CONSUMER_POOL_SIZE; i++) {
Expand Down Expand Up @@ -345,7 +345,7 @@ private class Sender {
private MessageProducer jmsProducer;

public Sender(Connection jmsConnection, String address) throws JMSException {
address = String.format("events.%s", address);
address = String.format("$SYS/svc/evt/%s", address);

Check warning on line 348 in commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java

View check run for this annotation

Codecov / codecov/patch

commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java#L348

Added line #L348 was not covered by tests
jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic jmsTopic = jmsSession.createTopic(address);
jmsProducer = jmsSession.createProducer(jmsTopic);
Expand Down
2 changes: 1 addition & 1 deletion consumer/lifecycle-app/src/main/resources/camel/camel.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
</pipeline>
</route>
<route id="dlq">
<from uri="amqp:queue:$SYS/dlq?selector=KAPUA_MESSAGE_TYPE='SYS'&amp;asyncConsumer=false&amp;acknowledgementModeName=CLIENT_ACKNOWLEDGE&amp;transacted=false&amp;concurrentConsumers=2&amp;maxConcurrentConsumers=5"/>
<from uri="amqp:queue:$SYS/dlq/default?selector=KAPUA_MESSAGE_TYPE='SYS'&amp;asyncConsumer=false&amp;acknowledgementModeName=CLIENT_ACKNOWLEDGE&amp;transacted=false&amp;concurrentConsumers=2&amp;maxConcurrentConsumers=5"/>
<pipeline>
<bean ref="kapuaCamelFilter" method="bindSession"/>
<to uri="bean:errorMessageListener?method=processMessage"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
<!-- exception/error handler -->
<camel:errorHandler id="mainRouteMessageErrorHandler" redeliveryPolicyRef="kapuaRedeliveryPolicy"
type="DeadLetterChannel"
deadLetterUri="amqp:queue:$SYS/dlq"
deadLetterUri="amqp:queue:$SYS/dlq/default"
useOriginalMessage="true">
</camel:errorHandler>
<camel:redeliveryPolicyProfile id="kapuaRedeliveryPolicy" maximumRedeliveries="0" redeliveryDelay="0" retryAttemptedLogLevel="WARN" logRetryAttempted="true"/>
Expand Down
2 changes: 1 addition & 1 deletion consumer/telemetry-app/src/main/resources/camel/camel.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
</pipeline>
</route>
<route id="dlq">
<from uri="amqp:queue:$SYS/dlq?selector=KAPUA_MESSAGE_TYPE='SYS'&amp;asyncConsumer=true&amp;acknowledgementModeName=CLIENT_ACKNOWLEDGE&amp;transacted=false&amp;concurrentConsumers=2&amp;maxConcurrentConsumers=5"/>
<from uri="amqp:queue:$SYS/dlq/default?selector=KAPUA_MESSAGE_TYPE='SYS'&amp;asyncConsumer=true&amp;acknowledgementModeName=CLIENT_ACKNOWLEDGE&amp;transacted=false&amp;concurrentConsumers=2&amp;maxConcurrentConsumers=5"/>
<pipeline>
<bean ref="kapuaCamelFilter" method="bindSession"/>
<to uri="bean:errorMessageListener?method=processMessage"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
<camel:camelContext id="telemetryContext" depends-on="application">
<camel:errorHandler id="messageErrorHandler" redeliveryPolicyRef="kapuaErrorRedeliveryPolicy"
type="DeadLetterChannel"
deadLetterUri="amqp:queue:dlq"
deadLetterUri="amqp:queue:$SYS/dlq/default"
useOriginalMessage="true">
</camel:errorHandler>
<camel:redeliveryPolicyProfile id="kapuaErrorRedeliveryPolicy" maximumRedeliveries="0" redeliveryDelay="0" retryAttemptedLogLevel="WARN" logRetryAttempted="true" />
Expand Down
8 changes: 4 additions & 4 deletions service/authentication-app/src/main/resources/camel/camel.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
-->
<routes xmlns="http://camel.apache.org/schema/spring">
<route errorHandlerRef="authRouteMessageErrorHandler" id="authRoute">
<from uri="amqp:queue:auth_request?asyncConsumer=false&amp;acknowledgementModeName=CLIENT_ACKNOWLEDGE&amp;transacted=false&amp;concurrentConsumers=2&amp;maxConcurrentConsumers=5"/>
<from uri="amqp:queue:$SYS/svc/ath/request?asyncConsumer=false&amp;acknowledgementModeName=CLIENT_ACKNOWLEDGE&amp;transacted=false&amp;concurrentConsumers=2&amp;maxConcurrentConsumers=5"/>
<pipeline>
<choice id="main">
<when id="doLogin">
Expand All @@ -32,17 +32,17 @@
<bean ref="authenticationServiceListener" method="getEntity"/>
</when>
</choice>
<toD uri="amqp:queue:auth_response_${header.cluster_name}_${header.requester}"/>
<toD uri="amqp:queue:$SYS/svc/ath/response/${header.cluster_name}_${header.requester}"/>
</pipeline>
</route>
<route id="authDlq">
<from uri="amqp:queue:dlq?asyncConsumer=false&amp;acknowledgementModeName=CLIENT_ACKNOWLEDGE&amp;transacted=false&amp;concurrentConsumers=1&amp;maxConcurrentConsumers=2"/>
<from uri="amqp:queue:$SYS/svc/dlq/default?asyncConsumer=false&amp;acknowledgementModeName=CLIENT_ACKNOWLEDGE&amp;transacted=false&amp;concurrentConsumers=1&amp;maxConcurrentConsumers=2"/>
<pipeline>
<to uri="bean:errorMessageListener?method=processMessage"/>
</pipeline>
</route>
<route id="expired">
<from uri="amqp:queue:expired?asyncConsumer=false&amp;acknowledgementModeName=CLIENT_ACKNOWLEDGE&amp;transacted=false&amp;concurrentConsumers=1&amp;maxConcurrentConsumers=2"/>
<from uri="amqp:queue:$SYS/svc/dlq/expired?asyncConsumer=false&amp;acknowledgementModeName=CLIENT_ACKNOWLEDGE&amp;transacted=false&amp;concurrentConsumers=1&amp;maxConcurrentConsumers=2"/>
<pipeline>
<to uri="bean:errorMessageListener?method=processMessage"/>
</pipeline>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
<!-- exception/error handler -->
<camel:errorHandler id="authRouteMessageErrorHandler" redeliveryPolicyRef="kapuaRedeliveryPolicy"
type="DeadLetterChannel"
deadLetterUri="amqp:queue:authDlq"
deadLetterUri="amqp:queue:$SYS/svc/dlq/default"
useOriginalMessage="true">
</camel:errorHandler>
<camel:redeliveryPolicyProfile id="kapuaRedeliveryPolicy" maximumRedeliveries="0" redeliveryDelay="0" retryAttemptedLogLevel="WARN" logRetryAttempted="true"/>
Expand Down

0 comments on commit f43d610

Please sign in to comment.