From 1024925d51e0fc41ca1b5d8121bd6752f1f5432e Mon Sep 17 00:00:00 2001 From: andsel Date: Sat, 15 Jun 2024 16:25:27 +0200 Subject: [PATCH] Update session creation to force read access to response topic to the requester client (the one connecting with request-response-information property) --- .../java/io/moquette/broker/Authorizator.java | 31 +++++++++++++++++-- .../io/moquette/broker/MQTTConnection.java | 22 ++++++++++--- .../io/moquette/broker/SessionRegistry.java | 6 ++++ 3 files changed, 52 insertions(+), 7 deletions(-) diff --git a/broker/src/main/java/io/moquette/broker/Authorizator.java b/broker/src/main/java/io/moquette/broker/Authorizator.java index 7cc404aee..6f237b1ea 100644 --- a/broker/src/main/java/io/moquette/broker/Authorizator.java +++ b/broker/src/main/java/io/moquette/broker/Authorizator.java @@ -26,6 +26,8 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.function.Function; import static io.moquette.broker.Utils.messageId; @@ -37,6 +39,11 @@ final class Authorizator { private final IAuthorizatorPolicy policy; + // Contains the list of topic-client that has read access forced on reply topic. + private ConcurrentMap, Boolean> responseTopicForcedReads = new ConcurrentHashMap<>(); + // Contains the list of requesters' reply topics that need write access by all the other (responders). + private ConcurrentMap responseTopicForcedWrites = new ConcurrentHashMap<>(); + Authorizator(IAuthorizatorPolicy policy) { this.policy = policy; } @@ -113,10 +120,30 @@ private MqttQoS getQoSCheckingAlsoPermissionsOnTopic(String clientID, String use * @return true if the user from client can publish data on topic. */ boolean canWrite(Topic topic, String user, String client) { - return policy.canWrite(topic, user, client); + boolean policyResult = policy.canWrite(topic, user, client); + if (!policyResult && responseTopicForcedWrites.containsKey(topic)) { + LOG.warn("Found write discord by policy and response information topic configured. The policy prohibit " + + "while the response topic should be accessible for all to write. topic: {}", topic); + return true; + } + return policyResult; } boolean canRead(Topic topic, String user, String client) { - return policy.canRead(topic, user, client); + boolean policyResult = policy.canRead(topic, user, client); + if (!policyResult && responseTopicForcedReads.containsKey(Utils.Couple.of(topic, client))) { + LOG.warn("Found read discord by policy and response information topic configured. The policy prohibit " + + "while the response topic should be accessible by read from client{}. topic: {}", client, topic); + return true; + } + return policyResult; + } + + void forceReadAccess(Topic topic, String client) { + responseTopicForcedReads.putIfAbsent(Utils.Couple.of(topic, client), true); + } + + public void forceWriteToAll(Topic topic) { + responseTopicForcedWrites.putIfAbsent(topic, true); } } diff --git a/broker/src/main/java/io/moquette/broker/MQTTConnection.java b/broker/src/main/java/io/moquette/broker/MQTTConnection.java index 37453103f..df8cfd22f 100644 --- a/broker/src/main/java/io/moquette/broker/MQTTConnection.java +++ b/broker/src/main/java/io/moquette/broker/MQTTConnection.java @@ -281,7 +281,13 @@ private void executeConnect(MqttConnectMessage msg, String clientId, boolean ser .sessionPresent(isSessionAlreadyPresent); if (isProtocolVersion(msg, MqttVersion.MQTT_5)) { // set properties for MQTT 5 - final MqttProperties ackProperties = prepareConnAckProperties(serverGeneratedClientId, clientId); + ConnAckPropertiesBuilder connAckPropertiesBuilder = prepareConnAckPropertiesBuilder(serverGeneratedClientId, clientId); + if (isNeedResponseInformation(msg)) { + // the responder and requested access to the topic are already configured during session creation + // in SessionRegistry + connAckPropertiesBuilder.responseInformation("/reqresp/response/" + clientId); + } + final MqttProperties ackProperties = connAckPropertiesBuilder.build(); connAckBuilder.properties(ackProperties); } final MqttConnAckMessage ackMessage = connAckBuilder.build(); @@ -328,6 +334,16 @@ public void operationComplete(ChannelFuture future) throws Exception { }); } + /** + * @return true iff message contains property REQUEST_RESPONSE_INFORMATION and is positive. + * */ + static boolean isNeedResponseInformation(MqttConnectMessage msg) { + MqttProperties.IntegerProperty requestRespInfo = (MqttProperties.IntegerProperty) msg.variableHeader() + .properties() + .getProperty(MqttProperties.MqttPropertyType.REQUEST_RESPONSE_INFORMATION.value()); + return requestRespInfo != null && requestRespInfo.value() >= 1; + } + /** * @return the value of the Payload Format Indicator property from Will specification. * */ @@ -352,10 +368,6 @@ private static boolean checkUTF8Validity(byte[] rawBytes) { return true; } - private MqttProperties prepareConnAckProperties(boolean serverGeneratedClientId, String clientId) { - return prepareConnAckPropertiesBuilder(serverGeneratedClientId, clientId).build(); - } - private ConnAckPropertiesBuilder prepareConnAckPropertiesBuilder(boolean serverGeneratedClientId, String clientId) { final ConnAckPropertiesBuilder builder = new ConnAckPropertiesBuilder(); // default maximumQos is 2, [MQTT-3.2.2-10] diff --git a/broker/src/main/java/io/moquette/broker/SessionRegistry.java b/broker/src/main/java/io/moquette/broker/SessionRegistry.java index 6c26641f2..e6520971d 100644 --- a/broker/src/main/java/io/moquette/broker/SessionRegistry.java +++ b/broker/src/main/java/io/moquette/broker/SessionRegistry.java @@ -395,6 +395,12 @@ private Session createNewSession(MqttConnectMessage msg, String clientId) { newSession = new Session(sessionData, clean, queue); newSession.markConnecting(); sessionsRepository.saveSession(sessionData); + if (MQTTConnection.isNeedResponseInformation(msg)) { + // the responder client must have write access to this topic + // the requester client must have read access on this topic + authorizator.forceReadAccess(Topic.asTopic("/reqresp/response/" + clientId), clientId); + authorizator.forceWriteToAll(Topic.asTopic("/reqresp/response/" + clientId)); + } return newSession; }