Skip to content

Commit

Permalink
Updated will publishing to contains also the message expire property (#…
Browse files Browse the repository at this point in the history
…824)

Updated will publishing to contains also the message expire property.

Change the publishing of will messages (PostOffice.publishWill method) to include also the message expiry property.
  • Loading branch information
andsel authored Apr 18, 2024
1 parent dd1600d commit 9b9f286
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 27 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Version 0.18-SNAPSHOT:
- Implements the management of message expiry for retained part. (#819)
- Avoid to publish messages that has elapsed its expire property. (#822)
- Update the message expiry property remaining seconds when a publish is forwarded. (#823)
- Updated will publishing to contains also the message expire property. (#824)
[feature] subscription option handling: (issue #808)
- Move from qos to subscription option implementing the persistence of SubscriptionOption to/from storage. (#810)
- Exposed the maximum granted QoS by the server with the config setting 'max_server_granted_qos'. (#811)
Expand Down
14 changes: 12 additions & 2 deletions broker/src/main/java/io/moquette/broker/ISessionsRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,10 @@ public Will(String topic, byte[] payload, MqttQoS qos, boolean retained, int del
}

/**
* Used only when update with an expire instant.
* Copy constructor used only when update with an expire instant.
* */
public Will(Will orig, Instant expireAt) {
this(orig.topic, orig.payload, orig.qos, orig.retained, orig.delayInterval);
this(orig.topic, orig.payload, orig.qos, orig.retained, orig.delayInterval, orig.properties);
this.expireAt = expireAt;
}

Expand All @@ -276,6 +276,16 @@ public Will(Will orig, WillOptions properties) {
this.properties = properties;
}

private Will(String topic, byte[] payload, MqttQoS qos, boolean retained, int delayInterval,
WillOptions properties) {
this.topic = topic;
this.payload = payload;
this.qos = qos;
this.retained = retained;
this.delayInterval = delayInterval;
this.properties = properties;
}

@Override
public Optional<Instant> expireAt() {
return Optional.ofNullable(expireAt);
Expand Down
12 changes: 11 additions & 1 deletion broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,17 @@ private void trackWillSpecificationForFutureFire(Session bindedSession, ISession
}

private void publishWill(ISessionsRepository.Will will) {
publish2Subscribers(WILL_PUBLISHER, Unpooled.copiedBuffer(will.payload), new Topic(will.topic), will.qos, will.retained, Instant.MAX);
final Instant messageExpiryInstant = willMessageExpiry(will);
publish2Subscribers(WILL_PUBLISHER, Unpooled.copiedBuffer(will.payload), new Topic(will.topic),
will.qos, will.retained, messageExpiryInstant);
}

private static Instant willMessageExpiry(ISessionsRepository.Will will) {
Optional<Duration> messageExpiryOpt = will.properties.messageExpiry();
if (messageExpiryOpt.isPresent()) {
return Instant.now().plus(messageExpiryOpt.get());
}
return Instant.MAX;
}

/**
Expand Down
67 changes: 56 additions & 11 deletions broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@
import com.hivemq.client.mqtt.mqtt5.message.disconnect.Mqtt5DisconnectReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5WillPublishBuilder;
import io.moquette.testclient.Client;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.*;
import org.awaitility.Awaitility;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -33,6 +31,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.LongSupplier;

import static org.junit.jupiter.api.Assertions.*;

Expand Down Expand Up @@ -118,11 +117,16 @@ public void receiveInflightPublishesAfterAReconnect() {
.atMost(2, TimeUnit.SECONDS)
.until(reconnectingSubscriber::hasReceivedMessages);

final String publishPayload = reconnectingSubscriber.nextQueuedMessage()
Optional<MqttPublishMessage> opt = reconnectingSubscriber.nextQueuedMessage()
.filter(m -> m instanceof MqttPublishMessage)
.map(m -> (MqttPublishMessage) m)
.map(m -> m.payload().toString(StandardCharsets.UTF_8))
.orElse("Fake Payload");
.map(m -> (MqttPublishMessage) m);
final String publishPayload;
if (opt.isPresent()) {
publishPayload = opt.get().payload().toString(StandardCharsets.UTF_8);
opt.get().release();
} else {
publishPayload = "Fake Payload";
}
assertEquals("Hello", publishPayload, "The inflight payload from previous subscription MUST be received");

reconnectingSubscriber.disconnect();
Expand All @@ -144,6 +148,27 @@ public void fireWillAfterTheDelaySpecifiedInConnectProperties() throws Interrupt
verifyPublishedMessage(testamentSubscriber, 10, "Will message must be received");
}

@Test
public void fireWillAfterTheDelaySpecifiedInConnectPropertiesAndMessageExpiry() throws InterruptedException {
int messageExpiry = 5;
final Mqtt5BlockingClient clientWithWill =
createAndConnectClientWithWillTestamentAndMessageExpiry("simple_client", 1, messageExpiry);

final Mqtt5BlockingClient testamentSubscriber = createAndConnectClientListeningToTestament();

// schedule a bad disconnect
scheduleDisconnectWithErrorCode(clientWithWill, Duration.ofMillis(500));

TestUtils.verifyPublishedMessage(testamentSubscriber, 10,
(Mqtt5Publish message) -> {
final String payload = new String(message.getPayloadAsBytes(), StandardCharsets.UTF_8);
assertEquals("Goodbye", payload, "Will message must be received");

long expiry = message.getMessageExpiryInterval().orElse(-1L);
assertEquals(messageExpiry, expiry);
});
}

private static void verifyPublishedMessage(Mqtt5BlockingClient subscriber, int timeout, String message) throws InterruptedException {
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {
Optional<Mqtt5Publish> publishMessage = publishes.receive(timeout, TimeUnit.SECONDS);
Expand Down Expand Up @@ -287,7 +312,16 @@ private Mqtt5BlockingClient createAndConnectClientWithWillTestament(String clien

@NotNull
private static Mqtt5BlockingClient createAndConnectClientWithWillTestament(String clientId, int delayInSeconds) {
Mqtt5ConnectBuilder connectBuilder = Mqtt5Connect.builder()
Mqtt5WillPublishBuilder.Nested.Complete<? extends Mqtt5ConnectBuilder> willPublishBuilder = deafaultWillBuilder(delayInSeconds);

Mqtt5ConnectBuilder connectBuilder = willPublishBuilder.applyWillPublish();

return createAndConnectWithBuilder(clientId, connectBuilder);
}

@NotNull
private static Mqtt5WillPublishBuilder.Nested.Complete<? extends Mqtt5ConnectBuilder> deafaultWillBuilder(int delayInSeconds) {
Mqtt5WillPublishBuilder.Nested.Complete<? extends Mqtt5ConnectBuilder> willPublishBuilder = Mqtt5Connect.builder()
.keepAlive(10)
.willPublish()
.topic("/will_testament")
Expand All @@ -296,8 +330,19 @@ private static Mqtt5BlockingClient createAndConnectClientWithWillTestament(Strin
.contentType("something content type here")
.userProperties()
.add("test_property", "value of a property")
.applyUserProperties()
.applyWillPublish();
.applyUserProperties();
return willPublishBuilder;
}

@NotNull
private static Mqtt5BlockingClient createAndConnectClientWithWillTestamentAndMessageExpiry(String clientId,
int delayInSeconds,
int messageExpirySeconds) {
Mqtt5WillPublishBuilder.Nested.Complete<? extends Mqtt5ConnectBuilder> willPublishBuilder = deafaultWillBuilder(delayInSeconds);

willPublishBuilder.messageExpiryInterval(messageExpirySeconds);

Mqtt5ConnectBuilder connectBuilder = willPublishBuilder.applyWillPublish();

return createAndConnectWithBuilder(clientId, connectBuilder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public String clientName() {
}

@Test
public void givenPublishWithRetainedAndMessageExpiryWhenTimePassedThenRetainedIsNotForwardedOnSubscription() throws InterruptedException {
public void givenPublishWithRetainedAndMessageExpiryWhenTimePassedThenRetainedIsNotForwardedOnSubscription() throws InterruptedException, MqttException {
Mqtt5BlockingClient publisher = createPublisherClient();
int messageExpiryInterval = 3; //seconds
publisher.publishWith()
Expand Down Expand Up @@ -113,6 +113,7 @@ public void givenPublishWithRetainedAndMessageExpiryWhenTimeIsNotExpiredAndSubsc
.getProperty(MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value());
assertNotNull(messageExpiryProperty, "message expiry property must be present");
assertTrue(messageExpiryProperty.value() < messageExpiryInterval, "Forwarded message expiry should be lowered");
assertTrue(publish.release(), "Last reference of publish should be released");
}

@Test
Expand Down Expand Up @@ -184,11 +185,11 @@ public void givenPublishedMessageWithExpiryWhenMessageRemainInBrokerForMoreThanT
public void givenPublishWithMessageExpiryPropertyWhenItsForwardedToSubscriberThenExpiryValueHasToBeDeducedByTheTimeSpentInBroker() throws InterruptedException {
int messageExpiryInterval = 10; // seconds
// avoid the keep alive period could disconnect
connectLowLevel((int)(messageExpiryInterval * 1.5));
connectLowLevel((int) (messageExpiryInterval * 1.5));

// subscribe with an identifier
MqttMessage received = lowLevelClient.subscribeWithIdentifier("temperature/living",
MqttQoS.AT_LEAST_ONCE, 123);
MqttQoS.AT_LEAST_ONCE, 123, 500, TimeUnit.MILLISECONDS);
verifyOfType(received, MqttMessageType.SUBACK);

//lowlevel client doesn't ACK any pub, so the in flight window fills up
Expand Down Expand Up @@ -225,6 +226,7 @@ public void givenPublishWithMessageExpiryPropertyWhenItsForwardedToSubscriberThe
Integer expirySeconds = ((MqttProperties.IntegerProperty) expiryProp).value();

assertTrue(expirySeconds < messageExpiryInterval, "Publish's expiry has to be updated");
assertTrue(publishMessage.release(), "Last reference of publish should be released");
}

private void consumesPublishesInflightWindow(int inflightWindowSize) throws InterruptedException {
Expand All @@ -236,6 +238,7 @@ private void consumesPublishesInflightWindow(int inflightWindowSize) throws Inte
MqttPublishMessage publish = (MqttPublishMessage) mqttMessage;
assertEquals(Integer.toString(i), publish.payload().toString(StandardCharsets.UTF_8));
int packetId = publish.variableHeader().packetId();
assertTrue(publish.release(), "Reference of publish should be released");

MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, AT_MOST_ONCE,
false, 0);
Expand All @@ -255,7 +258,7 @@ private static void fillInFlightWindow(int inflightWindowSize, Mqtt5BlockingClie
.messageExpiryInterval(messageExpiryInterval);
}

builder.send();
builder.send();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.*;

public class SubscriptionWithIdentifierTest extends AbstractSubscriptionIntegrationTest {

Expand Down Expand Up @@ -45,7 +44,9 @@ public void givenNonSharedSubscriptionWithIdentifierWhenPublishMatchedThenReceiv
.until(lowLevelClient::hasReceivedMessages);
MqttMessage mqttMsg = lowLevelClient.receiveNextMessage(Duration.ofSeconds(1));
verifyOfType(mqttMsg, MqttMessageType.PUBLISH);
verifySubscriptionIdentifier(123, (MqttPublishMessage) mqttMsg);
MqttPublishMessage mqttPublish = (MqttPublishMessage) mqttMsg;
verifySubscriptionIdentifier(123, mqttPublish);
assertTrue(mqttPublish.release(), "Reference of publish should be released");
}

private static void verifySubscriptionIdentifier(int expectedSubscriptionIdentifier, MqttPublishMessage publish) {
Expand Down Expand Up @@ -82,6 +83,8 @@ public void givenNonSharedSubscriptionWithIdentifierWhenRetainedMessageMatchedTh
.until(lowLevelClient::hasReceivedMessages);
MqttMessage mqttMsg = lowLevelClient.receiveNextMessage(Duration.ofSeconds(1));
verifyOfType(mqttMsg, MqttMessageType.PUBLISH);
verifySubscriptionIdentifier(123, (MqttPublishMessage) mqttMsg);
MqttPublishMessage mqttPublish = (MqttPublishMessage) mqttMsg;
verifySubscriptionIdentifier(123, mqttPublish);
assertTrue(mqttPublish.release(), "Reference of publish should be released");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@
import static org.junit.jupiter.api.Assertions.fail;

public class TestUtils {
static void verifyPublishedMessage(Mqtt5BlockingClient client, Consumer<Mqtt5Publish> verifier, int timeoutSeconds) throws Exception {
static void verifyPublishedMessage(Mqtt5BlockingClient client, int timeoutSeconds, Consumer<Mqtt5Publish> verifier) throws InterruptedException {
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = client.publishes(MqttGlobalPublishFilter.ALL)) {
Optional<Mqtt5Publish> publishMessage = publishes.receive(timeoutSeconds, TimeUnit.SECONDS);
if (!publishMessage.isPresent()) {
fail("Expected to receive a publish message");
fail("Expected to receive a publish in " + timeoutSeconds + " seconds");
return;
}
verifier.accept(publishMessage.get());
Expand Down
7 changes: 4 additions & 3 deletions broker/src/test/java/io/moquette/testclient/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.moquette.BrokerConstants;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
Expand Down Expand Up @@ -78,9 +79,9 @@ public Client(String host, int port) {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new MqttDecoder());
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
pipeline.addLast("handler", handler);
pipeline.addLast("rawcli_decoder", new MqttDecoder());
pipeline.addLast("rawcli_encoder", MqttEncoder.INSTANCE);
pipeline.addLast("rawcli_handler", handler);
}
});

Expand Down

0 comments on commit 9b9f286

Please sign in to comment.