Skip to content

Commit

Permalink
Update session creation to force read access to response topic to the…
Browse files Browse the repository at this point in the history
… requester client (the one connecting with request-response-information property)
  • Loading branch information
andsel committed Jun 15, 2024
1 parent 7006610 commit 1024925
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 7 deletions.
31 changes: 29 additions & 2 deletions broker/src/main/java/io/moquette/broker/Authorizator.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;

import static io.moquette.broker.Utils.messageId;
Expand All @@ -37,6 +39,11 @@ final class Authorizator {

private final IAuthorizatorPolicy policy;

// Contains the list of topic-client that has read access forced on reply topic.
private ConcurrentMap<Utils.Couple<Topic, String>, Boolean> responseTopicForcedReads = new ConcurrentHashMap<>();
// Contains the list of requesters' reply topics that need write access by all the other (responders).
private ConcurrentMap<Topic, Boolean> responseTopicForcedWrites = new ConcurrentHashMap<>();

Authorizator(IAuthorizatorPolicy policy) {
this.policy = policy;
}
Expand Down Expand Up @@ -113,10 +120,30 @@ private MqttQoS getQoSCheckingAlsoPermissionsOnTopic(String clientID, String use
* @return true if the user from client can publish data on topic.
*/
boolean canWrite(Topic topic, String user, String client) {
return policy.canWrite(topic, user, client);
boolean policyResult = policy.canWrite(topic, user, client);
if (!policyResult && responseTopicForcedWrites.containsKey(topic)) {
LOG.warn("Found write discord by policy and response information topic configured. The policy prohibit " +
"while the response topic should be accessible for all to write. topic: {}", topic);
return true;
}
return policyResult;
}

boolean canRead(Topic topic, String user, String client) {
return policy.canRead(topic, user, client);
boolean policyResult = policy.canRead(topic, user, client);
if (!policyResult && responseTopicForcedReads.containsKey(Utils.Couple.of(topic, client))) {
LOG.warn("Found read discord by policy and response information topic configured. The policy prohibit " +
"while the response topic should be accessible by read from client{}. topic: {}", client, topic);
return true;
}
return policyResult;
}

void forceReadAccess(Topic topic, String client) {
responseTopicForcedReads.putIfAbsent(Utils.Couple.of(topic, client), true);
}

public void forceWriteToAll(Topic topic) {
responseTopicForcedWrites.putIfAbsent(topic, true);
}
}
22 changes: 17 additions & 5 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,13 @@ private void executeConnect(MqttConnectMessage msg, String clientId, boolean ser
.sessionPresent(isSessionAlreadyPresent);
if (isProtocolVersion(msg, MqttVersion.MQTT_5)) {
// set properties for MQTT 5
final MqttProperties ackProperties = prepareConnAckProperties(serverGeneratedClientId, clientId);
ConnAckPropertiesBuilder connAckPropertiesBuilder = prepareConnAckPropertiesBuilder(serverGeneratedClientId, clientId);
if (isNeedResponseInformation(msg)) {
// the responder and requested access to the topic are already configured during session creation
// in SessionRegistry
connAckPropertiesBuilder.responseInformation("/reqresp/response/" + clientId);
}
final MqttProperties ackProperties = connAckPropertiesBuilder.build();
connAckBuilder.properties(ackProperties);
}
final MqttConnAckMessage ackMessage = connAckBuilder.build();
Expand Down Expand Up @@ -328,6 +334,16 @@ public void operationComplete(ChannelFuture future) throws Exception {
});
}

/**
* @return true iff message contains property REQUEST_RESPONSE_INFORMATION and is positive.
* */
static boolean isNeedResponseInformation(MqttConnectMessage msg) {
MqttProperties.IntegerProperty requestRespInfo = (MqttProperties.IntegerProperty) msg.variableHeader()
.properties()
.getProperty(MqttProperties.MqttPropertyType.REQUEST_RESPONSE_INFORMATION.value());
return requestRespInfo != null && requestRespInfo.value() >= 1;
}

/**
* @return the value of the Payload Format Indicator property from Will specification.
* */
Expand All @@ -352,10 +368,6 @@ private static boolean checkUTF8Validity(byte[] rawBytes) {
return true;
}

private MqttProperties prepareConnAckProperties(boolean serverGeneratedClientId, String clientId) {
return prepareConnAckPropertiesBuilder(serverGeneratedClientId, clientId).build();
}

private ConnAckPropertiesBuilder prepareConnAckPropertiesBuilder(boolean serverGeneratedClientId, String clientId) {
final ConnAckPropertiesBuilder builder = new ConnAckPropertiesBuilder();
// default maximumQos is 2, [MQTT-3.2.2-10]
Expand Down
6 changes: 6 additions & 0 deletions broker/src/main/java/io/moquette/broker/SessionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,12 @@ private Session createNewSession(MqttConnectMessage msg, String clientId) {
newSession = new Session(sessionData, clean, queue);
newSession.markConnecting();
sessionsRepository.saveSession(sessionData);
if (MQTTConnection.isNeedResponseInformation(msg)) {
// the responder client must have write access to this topic
// the requester client must have read access on this topic
authorizator.forceReadAccess(Topic.asTopic("/reqresp/response/" + clientId), clientId);
authorizator.forceWriteToAll(Topic.asTopic("/reqresp/response/" + clientId));
}
return newSession;
}

Expand Down

0 comments on commit 1024925

Please sign in to comment.