Skip to content

Commit

Permalink
Merge pull request #3924 from elbert3/feature-disconnectDevice
Browse files Browse the repository at this point in the history
Feature - Add option to disconnect a device from the broker
elbert3 authored Jan 9, 2024
2 parents 4dd0f73 + 7850853 commit c7159d4
Showing 29 changed files with 734 additions and 144 deletions.
28 changes: 28 additions & 0 deletions assembly/broker-artemis/configurations/locator.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2023, 2022 Eurotech and/or its affiliates and others
This program and the accompanying materials are made
available under the terms of the Eclipse Public License 2.0
which is available at https://www.eclipse.org/legal/epl-2.0/
SPDX-License-Identifier: EPL-2.0
Contributors:
Eurotech - initial API and implementation
-->
<!DOCTYPE xml>
<locator-config>
<provided>
<provide>
<interceptor>annotation</interceptor>
<with>Impl</with>
</provide>
</provided>

<packages>
<package>org.eclipse.kapua.commons</package>
<package>org.eclipse.kapua.message</package>
<package>org.eclipse.kapua.service</package>
</packages>
</locator-config>
36 changes: 33 additions & 3 deletions assembly/broker-artemis/descriptors/kapua-broker-artemis.xml
Original file line number Diff line number Diff line change
@@ -89,13 +89,43 @@
<include>org.eclipse.persistence:org.eclipse.persistence.extension</include>
<include>org.eclipse.persistence:javax.persistence</include>

<include>${pom.groupId}:kapua-commons</include>
<include>${pom.groupId}:kapua-client-security</include>
<!-- additional kapua-locator dependencies -->
<include>aopalliance:aopalliance</include>
<include>com.google.guava:failureaccess</include>
<include>com.google.guava:guava</include>
<include>com.google.guava:listenablefuture</include>
<include>com.google.inject.extensions:guice-multibindings</include>
<include>com.google.inject:guice</include>
<include>com.warrenstrange:googleauth</include>
<include>javax.cache:cache-api</include>
<include>javax.inject:javax.inject</include>
<include>org.apache.shiro:shiro-core</include>
<include>org.bitbucket.b_c:jose4j</include>
<include>org.javassist:javassist</include>

<include>${pom.groupId}:kapua-account-api</include>
<include>${pom.groupId}:kapua-account-internal</include>
<include>${pom.groupId}:kapua-broker-artemis-plugin</include>
<include>${pom.groupId}:kapua-client-security</include>
<include>${pom.groupId}:kapua-commons</include>
<include>${pom.groupId}:kapua-device-api</include>
<include>${pom.groupId}:kapua-device-authentication</include>
<include>${pom.groupId}:kapua-device-registry-api</include>
<include>${pom.groupId}:kapua-device-registry-internal</include>
<include>${pom.groupId}:kapua-locator-guice</include>
<include>${pom.groupId}:kapua-message-api</include>
<include>${pom.groupId}:kapua-message-internal</include>
<include>${pom.groupId}:kapua-openid-api</include>
<include>${pom.groupId}:kapua-service-api</include>
<include>${pom.groupId}:kapua-service-client</include>
<include>${pom.groupId}:kapua-security-authentication-api</include>
<include>${pom.groupId}:kapua-security-authorization-api</include>
<include>${pom.groupId}:kapua-security-certificate-api</include>
<include>${pom.groupId}:kapua-security-certificate-internal</include>
<include>${pom.groupId}:kapua-security-shiro</include>
<include>${pom.groupId}:kapua-service-client</include>
<include>${pom.groupId}:kapua-tag-api</include>
<include>${pom.groupId}:kapua-user-api</include>
<include>${pom.groupId}:kapua-user-internal</include>
</includes>
</dependencySet>
</dependencySets>
46 changes: 45 additions & 1 deletion assembly/broker-artemis/pom.xml
Original file line number Diff line number Diff line change
@@ -33,16 +33,40 @@

<dependency>
<groupId>org.eclipse.kapua</groupId>
<artifactId>kapua-broker-artemis-plugin</artifactId>
<artifactId>kapua-account-api</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.kapua</groupId>
<artifactId>kapua-account-internal</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.kapua</groupId>
<artifactId>kapua-commons</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.kapua</groupId>
<artifactId>kapua-broker-artemis-plugin</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.kapua</groupId>
<artifactId>kapua-client-security</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.kapua</groupId>
<artifactId>kapua-device-api</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.kapua</groupId>
<artifactId>kapua-device-registry-api</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.kapua</groupId>
<artifactId>kapua-device-registry-internal</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.kapua</groupId>
<artifactId>kapua-locator-guice</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.kapua</groupId>
<artifactId>kapua-service-api</artifactId>
@@ -55,6 +79,26 @@
<groupId>org.eclipse.kapua</groupId>
<artifactId>kapua-security-authentication-api</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.kapua</groupId>
<artifactId>kapua-security-certificate-internal</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.kapua</groupId>
<artifactId>kapua-security-shiro</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.kapua</groupId>
<artifactId>kapua-tag-api</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.kapua</groupId>
<artifactId>kapua-user-api</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.kapua</groupId>
<artifactId>kapua-user-internal</artifactId>
</dependency>

<dependency>
<groupId>org.eclipse.persistence</groupId>
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*******************************************************************************
* Copyright (c) 2023, 2022 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Eurotech
*******************************************************************************/
package org.eclipse.kapua.broker.artemis.plugin.security;

import java.util.HashMap;
import java.util.Map;

import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;

import org.eclipse.kapua.commons.util.xml.JAXBContextProvider;
import org.eclipse.kapua.event.ServiceEvent;
import org.eclipse.persistence.jaxb.JAXBContextFactory;
import org.eclipse.persistence.jaxb.MarshallerProperties;

public class BrokerJAXBContextProvider implements JAXBContextProvider {

private JAXBContext context;

@Override
public JAXBContext getJAXBContext() {
try {
if (context == null) {
Map<String, Object> properties = new HashMap<>(1);
properties.put(MarshallerProperties.JSON_WRAPPER_AS_ARRAY_NAME, true);
context = JAXBContextFactory.createContext(new Class<?>[]{
// KapuaEvent
ServiceEvent.class,
}, properties);
}
return context;
} catch (JAXBException e) {
throw new ExceptionInInitializerError(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -30,10 +30,10 @@
import org.eclipse.kapua.broker.artemis.plugin.security.connector.AcceptorHandler;
import org.eclipse.kapua.broker.artemis.plugin.security.event.BrokerEvent;
import org.eclipse.kapua.broker.artemis.plugin.security.event.BrokerEvent.EventType;
import org.eclipse.kapua.broker.artemis.plugin.security.event.BrokerEventHandler;
import org.eclipse.kapua.broker.artemis.plugin.security.metric.LoginMetric;
import org.eclipse.kapua.broker.artemis.plugin.security.metric.PublishMetric;
import org.eclipse.kapua.broker.artemis.plugin.security.metric.SubscribeMetric;
import org.eclipse.kapua.broker.artemis.plugin.security.event.BrokerEventHanldler;
import org.eclipse.kapua.broker.artemis.plugin.security.setting.BrokerSetting;
import org.eclipse.kapua.broker.artemis.plugin.security.setting.BrokerSettingKey;
import org.eclipse.kapua.client.security.AuthErrorCodes;
@@ -42,15 +42,22 @@
import org.eclipse.kapua.client.security.bean.AuthRequest;
import org.eclipse.kapua.client.security.context.SessionContext;
import org.eclipse.kapua.client.security.context.Utils;
import org.eclipse.kapua.commons.core.ServiceModuleBundle;
import org.eclipse.kapua.commons.metric.CommonsMetric;
import org.eclipse.kapua.commons.setting.system.SystemSetting;
import org.eclipse.kapua.commons.setting.system.SystemSettingKey;
import org.eclipse.kapua.commons.util.KapuaDateUtils;
import org.eclipse.kapua.commons.util.xml.XmlUtil;
import org.eclipse.kapua.event.ServiceEvent;
import org.eclipse.kapua.locator.KapuaLocator;
import org.eclipse.kapua.model.id.KapuaId;
import org.eclipse.kapua.service.authentication.exception.KapuaAuthenticationErrorCodes;
import org.eclipse.kapua.service.authentication.exception.KapuaAuthenticationException;
import org.eclipse.kapua.service.client.DatabaseCheckUpdate;
import org.eclipse.kapua.service.client.message.MessageConstants;
import org.eclipse.kapua.service.device.connection.listener.DeviceConnectionEventListenerService;
import org.eclipse.kapua.service.device.registry.connection.DeviceConnection;
import org.eclipse.kapua.service.device.registry.connection.DeviceConnectionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -67,6 +74,7 @@ public class ServerPlugin implements ActiveMQServerPlugin {

private static final int DEFAULT_PUBLISHED_MESSAGE_SIZE_LOG_THRESHOLD = 100000;
private static final String MISSING_TOPIC_SUFFIX = "MQTT.LWT";
private static final String DISCONNECT_EVENT_OPERATION = "disconnect";

enum Failure {
CLOSED,
@@ -102,11 +110,13 @@ public String getAsUrl() {
private PublishMetric publishMetric;
private SubscribeMetric subscribeMetric;

protected BrokerEventHanldler brokerEventHanldler;
protected BrokerEventHandler brokerEventHanldler;
protected AcceptorHandler acceptorHandler;
protected String version;
protected ServerContext serverContext;

protected DeviceConnectionEventListenerService deviceConnectionEventListenerService;

public ServerPlugin() {
//TODO find which is the right plugin to use to set this parameter (ServerPlugin or SecurityPlugin???)
CommonsMetric.module = MetricsSecurityPlugin.BROKER_TELEMETRY;
@@ -117,9 +127,11 @@ public ServerPlugin() {
//TODO find a proper way to initialize database
DatabaseCheckUpdate databaseCheckUpdate = new DatabaseCheckUpdate();
serverContext = ServerContext.getInstance();
brokerEventHanldler = BrokerEventHanldler.getInstance();
brokerEventHanldler = BrokerEventHandler.getInstance();
brokerEventHanldler.registerConsumer((brokerEvent) -> disconnectClient(brokerEvent));
brokerEventHanldler.start();

deviceConnectionEventListenerService = KapuaLocator.getInstance().getService(DeviceConnectionEventListenerService.class);
}

@Override
@@ -132,6 +144,15 @@ public void registered(ActiveMQServer server) {
BrokerSetting.getInstance().getMap(String.class, BrokerSettingKey.ACCEPTORS));
//init acceptors
acceptorHandler.syncAcceptors();

deviceConnectionEventListenerService.addReceiver(serviceEvent -> processDeviceConnectionEvent(serviceEvent));

// Setup service events
ServiceModuleBundle app = KapuaLocator.getInstance().getService(ServiceModuleBundle.class);
app.startup();

// Setup JAXB Context
XmlUtil.setContextProvider(new BrokerJAXBContextProvider());
} catch (Exception e) {
logger.error("Error while initializing {} plugin: {}", this.getClass().getName(), e.getMessage(), e);
}
@@ -320,6 +341,37 @@ private int disconnectClient(String connectionId) {
}).mapToInt(Integer::new).sum();
}

protected void processDeviceConnectionEvent(ServiceEvent event) {
logger.debug("Received event: {}", event);

if(!DISCONNECT_EVENT_OPERATION.equals(event.getOperation())) {
logger.debug("Ignoring event with operation: {}", event.getOperation());
return;
}

try {
DeviceConnection deviceConnection = KapuaLocator.getInstance().getService(DeviceConnectionService.class).find(event.getEntityScopeId(), event.getEntityId());
if(deviceConnection == null) {
logger.warn("DeviceConnection not found - scopeId: {}, id: {} - ", event.getEntityScopeId(), event.getEntityId());
return;
}

String fullClientId = Utils.getFullClientId(deviceConnection.getScopeId(), deviceConnection.getClientId());
SessionContext sessionContext = serverContext.getSecurityContext().getSessionContextByClientId(fullClientId);
if(sessionContext == null) {
logger.info("Did not find any connections to disconnect for clientId: {}", fullClientId);
return;
}

BrokerEvent disconnectEvent = new BrokerEvent(EventType.disconnectClientByConnectionId, sessionContext, sessionContext);

logger.info("Submitting broker event to disconnect clientId: {}, connectionId: {}", fullClientId, sessionContext.getConnectionId());
BrokerEventHandler.getInstance().enqueueEvent(disconnectEvent);
} catch (Exception e) {
logger.warn("Error processing event: {}", e);
}
}

@Override
public void duplicateSessionMetadataFailure(ServerSession session, String key, String data) throws ActiveMQException {
logger.error("Duplicate session for key: {} - data: {}", key, data);
Original file line number Diff line number Diff line change
@@ -14,19 +14,19 @@

import org.eclipse.kapua.commons.localevent.EventHandler;

public class BrokerEventHanldler extends EventHandler<BrokerEvent> {
public class BrokerEventHandler extends EventHandler<BrokerEvent> {

private static BrokerEventHanldler instance;
private static BrokerEventHandler instance;

public static synchronized BrokerEventHanldler getInstance() {
public static synchronized BrokerEventHandler getInstance() {
if (instance==null) {
instance = new BrokerEventHanldler();
instance = new BrokerEventHandler();
}
return instance;
}

private BrokerEventHanldler() {
super(BrokerEventHanldler.class.getName(), 10, 10);
private BrokerEventHandler() {
super(BrokerEventHandler.class.getName(), 10, 10);
}

}
Loading

0 comments on commit c7159d4

Please sign in to comment.