Skip to content

Commit

Permalink
Uses the expiry interval from connect properties (#764)
Browse files Browse the repository at this point in the history
- Retrieve the session expiry interval from Connect property SESSION_EXPIRY_INTERVAL
- Added unit test to verify that CONNECT's SESSION_EXPIRY_INTERVAL property is used to set session expiration timeout in seconds
- Fallback to global expiry if one is not provided (if session is not clean)
- cap the SESSION_EXPIRY_INTERVAL that client can configure with the setting 'persistent_client_expiration'
  • Loading branch information
andsel authored Aug 1, 2023
1 parent 0b6b1c0 commit 793ce33
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 7 deletions.
20 changes: 18 additions & 2 deletions broker/src/main/java/io/moquette/broker/SessionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttVersion;
import org.slf4j.Logger;
Expand Down Expand Up @@ -298,9 +299,24 @@ private Session createNewSession(MqttConnectMessage msg, String clientId) {
} else {
queue = new InMemoryQueue();
}
// in MQTT3 cleanSession = true means expiryInterval=0 else infinite
final int expiryInterval = clean ? 0 : globalExpirySeconds;
final int expiryInterval;
final MqttVersion mqttVersion = Utils.versionFromConnect(msg);
if (mqttVersion != MqttVersion.MQTT_5) {
// in MQTT3 cleanSession = true means expiryInterval=0 else infinite
expiryInterval = clean ? 0 : globalExpirySeconds;
} else {
final MqttProperties.MqttProperty<Integer> expiryIntervalProperty =
(MqttProperties.MqttProperty<Integer>) msg.variableHeader().properties()
.getProperty(MqttProperties.MqttPropertyType.SESSION_EXPIRY_INTERVAL.value());
if (expiryIntervalProperty != null) {
int preferredExpiryInterval = expiryIntervalProperty.value();
// limit the maximum expiry, use the value configured in "persistent_client_expiration"
expiryInterval = Math.min(preferredExpiryInterval, globalExpirySeconds);
} else {
// the connect doesn't provide any expiry, fallback to global expiry
expiryInterval = clean ? 0 : globalExpirySeconds;
}
}
final ISessionsRepository.SessionData sessionData = new ISessionsRepository.SessionData(clientId,
mqttVersion, expiryInterval, clock);
if (msg.variableHeader().isWillFlag()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.moquette.broker;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttVersion;
import org.awaitility.Awaitility;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class SessionRegistryMQTT5Test extends SessionRegistryTest {
private static final Logger LOG = LoggerFactory.getLogger(SessionRegistryMQTT5Test.class);
@Test
public void givenSessionWithConnectionExpireTimeWhenAfterExpirationIsPassedThenSessionIsRemoved() {
LOG.info("givenSessionWithExpireTimeWhenAfterExpirationIsPassedThenSessionIsRemoved");
// insert a not clean session that should expire in connect selected expiry time
final String clientId = "client_to_be_removed";
final MqttProperties connectProperties = new MqttProperties();
int customExpirySeconds = 60;
connectProperties.add(new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.SESSION_EXPIRY_INTERVAL.value(), customExpirySeconds));
final MqttConnectMessage connectMessage = MqttMessageBuilders.connect()
.protocolVersion(MqttVersion.MQTT_5)
.cleanSession(false)
.properties(connectProperties)
.build();
final SessionRegistry.SessionCreationResult res = sut.createOrReopenSession(connectMessage, clientId, "User");
assertEquals(SessionRegistry.CreationModeEnum.CREATED_CLEAN_NEW, res.mode, "Not clean session must be created");
// remove it, so that it's tracked in the inner delay queue
sut.connectionClosed(res.session);
assertEquals(1, sessionRepository.list().size(), "Not clean session must be persisted");
// move time forward
Duration moreThenSessionExpiration = Duration.ofSeconds(customExpirySeconds).plusSeconds(10);
slidingClock.forward(moreThenSessionExpiration);
// check the session has been removed
Awaitility
.await()
.atMost(3 * SessionRegistry.EXPIRED_SESSION_CLEANER_TASK_INTERVAL.toMillis(), TimeUnit.MILLISECONDS)
.until(sessionsList(), Matchers.empty());
}
}
10 changes: 5 additions & 5 deletions broker/src/test/java/io/moquette/broker/SessionRegistryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ public class SessionRegistryTest {

private MQTTConnection connection;
private EmbeddedChannel channel;
private SessionRegistry sut;
private MqttMessageBuilders.ConnectBuilder connMsg;
protected SessionRegistry sut;
protected MqttMessageBuilders.ConnectBuilder connMsg;
private static final BrokerConfiguration ALLOW_ANONYMOUS_AND_ZEROBYTE_CLIENT_ID =
new BrokerConfiguration(true, true, false, NO_BUFFER_FLUSH);
private MemoryQueueRepository queueRepository;
private ScheduledExecutorService scheduler;
private final Clock pointInTimeFixedClock = Clock.fixed(Instant.parse("2023-03-26T18:09:30.00Z"), ZoneId.of("Europe/Rome"));
private ForwardableClock slidingClock = new ForwardableClock(pointInTimeFixedClock);
private ISessionsRepository sessionRepository;
protected ForwardableClock slidingClock = new ForwardableClock(pointInTimeFixedClock);
protected ISessionsRepository sessionRepository;

@BeforeEach
public void setUp() {
Expand Down Expand Up @@ -317,7 +317,7 @@ public void givenSessionThatExpiresWhenReopenIsNotAnymoreTrackedForExpiration()
.until(sessionsList(), Matchers.not(Matchers.empty()));
}

private Callable<Collection<ISessionsRepository.SessionData>> sessionsList() {
protected Callable<Collection<ISessionsRepository.SessionData>> sessionsList() {
return () -> sessionRepository.list();
}
}
2 changes: 2 additions & 0 deletions distribution/src/main/resources/moquette.conf
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ password_file config/password_file.conf
# This option allows the session of persistent clients (those with clean session set to false) that are not
# currently connected to be removed if they do not reconnect within a certain time frame.
# This is a non-standard option in MQTT v3.1. MQTT v3.1.1 and v5.0 allow brokers to remove client sessions.
# For MQTT 5 clients that already defines the property SESSION_EXPIRY_INTERVAL during connection, this
# value is still used as maximum available expiry interval, overriding the client's request if it's bigger.
# The expiration period should be an integer followed by one of s m h d w M y for seconds, minutes, hours, days, weeks,
# months and years respectively. For example: 2m or 14d or 1y
# default: infinite expiry
Expand Down

0 comments on commit 793ce33

Please sign in to comment.