From 793ce330296430cd12653d9cdf059ce434bbf008 Mon Sep 17 00:00:00 2001 From: Andrea Selva Date: Tue, 1 Aug 2023 15:09:45 +0200 Subject: [PATCH] Uses the expiry interval from connect properties (#764) - Retrieve the session expiry interval from Connect property SESSION_EXPIRY_INTERVAL - Added unit test to verify that CONNECT's SESSION_EXPIRY_INTERVAL property is used to set session expiration timeout in seconds - Fallback to global expiry if one is not provided (if session is not clean) - cap the SESSION_EXPIRY_INTERVAL that client can configure with the setting 'persistent_client_expiration' --- .../io/moquette/broker/SessionRegistry.java | 20 ++++++++- .../broker/SessionRegistryMQTT5Test.java | 43 +++++++++++++++++++ .../moquette/broker/SessionRegistryTest.java | 10 ++--- distribution/src/main/resources/moquette.conf | 2 + 4 files changed, 68 insertions(+), 7 deletions(-) create mode 100644 broker/src/test/java/io/moquette/broker/SessionRegistryMQTT5Test.java diff --git a/broker/src/main/java/io/moquette/broker/SessionRegistry.java b/broker/src/main/java/io/moquette/broker/SessionRegistry.java index d43d805e4..6028c94f3 100644 --- a/broker/src/main/java/io/moquette/broker/SessionRegistry.java +++ b/broker/src/main/java/io/moquette/broker/SessionRegistry.java @@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.handler.codec.mqtt.MqttConnectMessage; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttVersion; import org.slf4j.Logger; @@ -298,9 +299,24 @@ private Session createNewSession(MqttConnectMessage msg, String clientId) { } else { queue = new InMemoryQueue(); } - // in MQTT3 cleanSession = true means expiryInterval=0 else infinite - final int expiryInterval = clean ? 0 : globalExpirySeconds; + final int expiryInterval; final MqttVersion mqttVersion = Utils.versionFromConnect(msg); + if (mqttVersion != MqttVersion.MQTT_5) { + // in MQTT3 cleanSession = true means expiryInterval=0 else infinite + expiryInterval = clean ? 0 : globalExpirySeconds; + } else { + final MqttProperties.MqttProperty expiryIntervalProperty = + (MqttProperties.MqttProperty) msg.variableHeader().properties() + .getProperty(MqttProperties.MqttPropertyType.SESSION_EXPIRY_INTERVAL.value()); + if (expiryIntervalProperty != null) { + int preferredExpiryInterval = expiryIntervalProperty.value(); + // limit the maximum expiry, use the value configured in "persistent_client_expiration" + expiryInterval = Math.min(preferredExpiryInterval, globalExpirySeconds); + } else { + // the connect doesn't provide any expiry, fallback to global expiry + expiryInterval = clean ? 0 : globalExpirySeconds; + } + } final ISessionsRepository.SessionData sessionData = new ISessionsRepository.SessionData(clientId, mqttVersion, expiryInterval, clock); if (msg.variableHeader().isWillFlag()) { diff --git a/broker/src/test/java/io/moquette/broker/SessionRegistryMQTT5Test.java b/broker/src/test/java/io/moquette/broker/SessionRegistryMQTT5Test.java new file mode 100644 index 000000000..d0d3dc3cf --- /dev/null +++ b/broker/src/test/java/io/moquette/broker/SessionRegistryMQTT5Test.java @@ -0,0 +1,43 @@ +package io.moquette.broker; +import io.netty.handler.codec.mqtt.MqttConnectMessage; +import io.netty.handler.codec.mqtt.MqttMessageBuilders; +import io.netty.handler.codec.mqtt.MqttProperties; +import io.netty.handler.codec.mqtt.MqttVersion; +import org.awaitility.Awaitility; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import static org.junit.jupiter.api.Assertions.assertEquals; +public class SessionRegistryMQTT5Test extends SessionRegistryTest { + private static final Logger LOG = LoggerFactory.getLogger(SessionRegistryMQTT5Test.class); + @Test + public void givenSessionWithConnectionExpireTimeWhenAfterExpirationIsPassedThenSessionIsRemoved() { + LOG.info("givenSessionWithExpireTimeWhenAfterExpirationIsPassedThenSessionIsRemoved"); + // insert a not clean session that should expire in connect selected expiry time + final String clientId = "client_to_be_removed"; + final MqttProperties connectProperties = new MqttProperties(); + int customExpirySeconds = 60; + connectProperties.add(new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.SESSION_EXPIRY_INTERVAL.value(), customExpirySeconds)); + final MqttConnectMessage connectMessage = MqttMessageBuilders.connect() + .protocolVersion(MqttVersion.MQTT_5) + .cleanSession(false) + .properties(connectProperties) + .build(); + final SessionRegistry.SessionCreationResult res = sut.createOrReopenSession(connectMessage, clientId, "User"); + assertEquals(SessionRegistry.CreationModeEnum.CREATED_CLEAN_NEW, res.mode, "Not clean session must be created"); + // remove it, so that it's tracked in the inner delay queue + sut.connectionClosed(res.session); + assertEquals(1, sessionRepository.list().size(), "Not clean session must be persisted"); + // move time forward + Duration moreThenSessionExpiration = Duration.ofSeconds(customExpirySeconds).plusSeconds(10); + slidingClock.forward(moreThenSessionExpiration); + // check the session has been removed + Awaitility + .await() + .atMost(3 * SessionRegistry.EXPIRED_SESSION_CLEANER_TASK_INTERVAL.toMillis(), TimeUnit.MILLISECONDS) + .until(sessionsList(), Matchers.empty()); + } +} \ No newline at end of file diff --git a/broker/src/test/java/io/moquette/broker/SessionRegistryTest.java b/broker/src/test/java/io/moquette/broker/SessionRegistryTest.java index b6186c173..4f7939a89 100644 --- a/broker/src/test/java/io/moquette/broker/SessionRegistryTest.java +++ b/broker/src/test/java/io/moquette/broker/SessionRegistryTest.java @@ -76,15 +76,15 @@ public class SessionRegistryTest { private MQTTConnection connection; private EmbeddedChannel channel; - private SessionRegistry sut; - private MqttMessageBuilders.ConnectBuilder connMsg; + protected SessionRegistry sut; + protected MqttMessageBuilders.ConnectBuilder connMsg; private static final BrokerConfiguration ALLOW_ANONYMOUS_AND_ZEROBYTE_CLIENT_ID = new BrokerConfiguration(true, true, false, NO_BUFFER_FLUSH); private MemoryQueueRepository queueRepository; private ScheduledExecutorService scheduler; private final Clock pointInTimeFixedClock = Clock.fixed(Instant.parse("2023-03-26T18:09:30.00Z"), ZoneId.of("Europe/Rome")); - private ForwardableClock slidingClock = new ForwardableClock(pointInTimeFixedClock); - private ISessionsRepository sessionRepository; + protected ForwardableClock slidingClock = new ForwardableClock(pointInTimeFixedClock); + protected ISessionsRepository sessionRepository; @BeforeEach public void setUp() { @@ -317,7 +317,7 @@ public void givenSessionThatExpiresWhenReopenIsNotAnymoreTrackedForExpiration() .until(sessionsList(), Matchers.not(Matchers.empty())); } - private Callable> sessionsList() { + protected Callable> sessionsList() { return () -> sessionRepository.list(); } } diff --git a/distribution/src/main/resources/moquette.conf b/distribution/src/main/resources/moquette.conf index dc8732119..4eb00b2ae 100644 --- a/distribution/src/main/resources/moquette.conf +++ b/distribution/src/main/resources/moquette.conf @@ -205,6 +205,8 @@ password_file config/password_file.conf # This option allows the session of persistent clients (those with clean session set to false) that are not # currently connected to be removed if they do not reconnect within a certain time frame. # This is a non-standard option in MQTT v3.1. MQTT v3.1.1 and v5.0 allow brokers to remove client sessions. +# For MQTT 5 clients that already defines the property SESSION_EXPIRY_INTERVAL during connection, this +# value is still used as maximum available expiry interval, overriding the client's request if it's bigger. # The expiration period should be an integer followed by one of s m h d w M y for seconds, minutes, hours, days, weeks, # months and years respectively. For example: 2m or 14d or 1y # default: infinite expiry