Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mqtt-streaming: test two fixes for flaky tests #460

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 128 additions & 134 deletions mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import pekko.testkit._
import pekko.util.{ ByteString, Timeout }
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.{ Eventually, ScalaFutures }
import org.scalatest.time.{ Millis, Minutes, Span }
import org.scalatest.time.{ Millis, Span }

import scala.concurrent.{ ExecutionContext, Promise }
import scala.concurrent.duration._
Expand Down Expand Up @@ -1173,7 +1173,7 @@ class MqttSessionSpec
val unsubAck = UnsubAck(PacketId(1))
val unsubAckReceived = Promise[Done]()

val (client, result) =
val (client, results) =
Source
.queue(2, OverflowStrategy.fail)
.via(
Expand All @@ -1188,11 +1188,7 @@ class MqttSessionSpec
case _ =>
}
}
.takeWhile {
case Right(Event(PingResp, None)) => false
case _ => true
}
.toMat(Sink.seq)(Keep.both)
.toMat(Sink.queue(10))(Keep.both)
.run()

val connect = Connect("some-client-id", ConnectFlags.None)
Expand Down Expand Up @@ -1232,18 +1228,19 @@ class MqttSessionSpec
client.offer(Command(unsubscribe))

server.expectMsg(unsubscribeBytes)
// Possible to receive a pub from an unsubscribed topic given that it may be in transit
server.reply(unsubAckBytes ++ publishDupBytes)

unsubAckReceived.future.futureValue shouldBe Done

results.queue.pull().futureValue shouldBe Some(Right(Event(unsubAck)))
results.queue.pull().futureValue shouldBe Some(Right(Event(publishDup)))
client.offer(Command(pubAck))

server.expectMsg(pubAckBytes)
server.reply(pingRespBytes)

// Quite possible to receive a pub from an unsubscribed topic given that it may be in transit
result.futureValue shouldBe Vector(Right(Event(unsubAck)), Right(Event(publishDup)))

results.cancel()
client.complete()
client.watchCompletion().foreach(_ => session.shutdown())
}
Expand Down Expand Up @@ -1922,159 +1919,156 @@ class MqttSessionSpec
// longer patience needed since Akka 2.6
implicit val patienceConfig: PatienceConfig = PatienceConfig(scaled(1.second), scaled(50.millis))

// https://github.com/apache/incubator-pekko-connectors/issues/148
eventually(timeout(Span(1, Minutes))) {
val serverSession = ActorMqttServerSession(settings.withProducerPubAckRecTimeout(10.millis))
val serverSession = ActorMqttServerSession(settings.withProducerPubAckRecTimeout(10.millis))

val client1 = TestProbe()
val toClient1 = Sink.foreach[ByteString](bytes => client1.ref ! bytes)
val (client1Connection, fromClient1) = Source
.queue[ByteString](1, OverflowStrategy.dropHead)
.toMat(BroadcastHub.sink)(Keep.both)
.run()
val client1 = TestProbe()
val toClient1 = Sink.foreach[ByteString](bytes => client1.ref ! bytes)
val (client1Connection, fromClient1) = Source
.queue[ByteString](1, OverflowStrategy.dropHead)
.toMat(BroadcastHub.sink)(Keep.both)
.run()

val pipeToClient1 = Flow.fromSinkAndSource(toClient1, fromClient1)
val pipeToClient1 = Flow.fromSinkAndSource(toClient1, fromClient1)

val client2 = TestProbe()
val toClient2 = Sink.foreach[ByteString](bytes => client2.ref ! bytes)
val (client2Connection, fromClient2) = Source
.queue[ByteString](0, OverflowStrategy.dropHead)
.toMat(BroadcastHub.sink)(Keep.both)
.run()
val client2 = TestProbe()
val toClient2 = Sink.foreach[ByteString](bytes => client2.ref ! bytes)
val (client2Connection, fromClient2) = Source
.queue[ByteString](0, OverflowStrategy.dropHead)
.toMat(BroadcastHub.sink)(Keep.both)
.run()

val pipeToClient2 = Flow.fromSinkAndSource(toClient2, fromClient2)
val pipeToClient2 = Flow.fromSinkAndSource(toClient2, fromClient2)

val clientId = "some-client-id"

val clientId = "some-client-id"
val connect = Connect(clientId, ConnectFlags.None)
val connect1Received = Promise[Done]()
val connect2Received = Promise[Done]()

val connect = Connect(clientId, ConnectFlags.None)
val connect1Received = Promise[Done]()
val connect2Received = Promise[Done]()
val subscribe = Subscribe("some-topic")
val subscribe1Received = Promise[Done]()
val subscribe2Received = Promise[Done]()

val subscribe = Subscribe("some-topic")
val subscribe1Received = Promise[Done]()
val subscribe2Received = Promise[Done]()
val pubAckReceived = Promise[Done]()

val pubAckReceived = Promise[Done]()
val disconnect = Disconnect
val disconnectReceived = Promise[Done]()

val disconnect = Disconnect
val disconnectReceived = Promise[Done]()
val serverConnection1 =
Source
.queue[Command[Nothing]](1, OverflowStrategy.fail)
.via(
Mqtt
.serverSessionFlow(serverSession, ByteString("connection 1"))
.join(pipeToClient1))
.wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
case Right(Event(`connect`, _)) =>
connect1Received.success(Done)
case Right(Event(cp: Subscribe, _)) if cp.topicFilters == subscribe.topicFilters =>
subscribe1Received.success(Done)
case Right(Event(`disconnect`, _)) =>
disconnectReceived.success(Done)
case other => fail(s"didn't match `$other`")
})
.toMat(Sink.seq)(Keep.left)
.run()

val connectBytes = connect.encode(ByteString.newBuilder).result()
val connAck = ConnAck(ConnAckFlags.None, ConnAckReturnCode.ConnectionAccepted)
val connAckBytes = connAck.encode(ByteString.newBuilder).result()

val subscribeBytes = subscribe.encode(ByteString.newBuilder, PacketId(1)).result()
val subAck = SubAck(PacketId(1), List(ControlPacketFlags.QoSAtLeastOnceDelivery))
val subAckBytes = subAck.encode(ByteString.newBuilder).result()

val serverConnection1 =
Source
.queue[Command[Nothing]](1, OverflowStrategy.fail)
.via(
Mqtt
.serverSessionFlow(serverSession, ByteString.empty)
.join(pipeToClient1))
.wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
case Right(Event(`connect`, _)) =>
connect1Received.success(Done)
case Right(Event(cp: Subscribe, _)) if cp.topicFilters == subscribe.topicFilters =>
subscribe1Received.success(Done)
case Right(Event(`disconnect`, _)) =>
disconnectReceived.success(Done)
case other => fail(s"didn't match `$other`")
})
.toMat(Sink.seq)(Keep.left)
.run()
val publish = Publish("some-topic", ByteString("some-payload"))
val publishBytes = publish.encode(ByteString.newBuilder, Some(PacketId(1))).result()
val dupPublishBytes = publish
.copy(flags = publish.flags | ControlPacketFlags.DUP)
.encode(ByteString.newBuilder, Some(PacketId(1)))
.result()
val pubAck = PubAck(PacketId(1))
val pubAckBytes = pubAck.encode(ByteString.newBuilder).result()

val connectBytes = connect.encode(ByteString.newBuilder).result()
val connAck = ConnAck(ConnAckFlags.None, ConnAckReturnCode.ConnectionAccepted)
val connAckBytes = connAck.encode(ByteString.newBuilder).result()
val disconnectBytes = disconnect.encode(ByteString.newBuilder).result()

val subscribeBytes = subscribe.encode(ByteString.newBuilder, PacketId(1)).result()
val subAck = SubAck(PacketId(1), List(ControlPacketFlags.QoSAtLeastOnceDelivery))
val subAckBytes = subAck.encode(ByteString.newBuilder).result()
client1Connection.offer(connectBytes)

val publish = Publish("some-topic", ByteString("some-payload"))
val publishBytes = publish.encode(ByteString.newBuilder, Some(PacketId(1))).result()
val dupPublishBytes = publish
.copy(flags = publish.flags | ControlPacketFlags.DUP)
.encode(ByteString.newBuilder, Some(PacketId(1)))
.result()
val pubAck = PubAck(PacketId(1))
val pubAckBytes = pubAck.encode(ByteString.newBuilder).result()
connect1Received.future.futureValue shouldBe Done

val disconnectBytes = disconnect.encode(ByteString.newBuilder).result()
serverConnection1.offer(Command(connAck))
client1.expectMsg(connAckBytes)

client1Connection.offer(connectBytes)
client1Connection.offer(subscribeBytes)

connect1Received.future.futureValue shouldBe Done
subscribe1Received.future.futureValue shouldBe Done

serverConnection1.offer(Command(connAck))
client1.expectMsg(connAckBytes)
serverConnection1.offer(Command(subAck))
client1.expectMsg(subAckBytes)

client1Connection.offer(subscribeBytes)
serverSession ! Command(publish)
client1.expectMsg(publishBytes)

subscribe1Received.future.futureValue shouldBe Done
// Perform an explicit disconnect otherwise, if for example, we
// just completed the client connection, the session may receive
// the associated ConnectionLost signal for the new connection
// given that the new connection occurs so quickly.
client1Connection.offer(disconnectBytes)

serverConnection1.offer(Command(subAck))
client1.expectMsg(subAckBytes)
disconnectReceived.future.futureValue shouldBe Done

serverSession ! Command(publish)
client1.expectMsg(publishBytes)
val serverConnection2 =
Source
.queue[Command[Nothing]](1, OverflowStrategy.fail)
.via(
Mqtt
.serverSessionFlow(serverSession, ByteString("connection 2"))
.join(pipeToClient2))
.wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
case Right(Event(`connect`, _)) =>
connect2Received.success(Done)
case Right(Event(cp: Subscribe, _)) if cp.topicFilters == subscribe.topicFilters =>
subscribe2Received.success(Done)
case Right(Event(_: PubAck, _)) =>
pubAckReceived.success(Done)
case other => fail(s"didn't match `$other`")
})
.toMat(Sink.seq)(Keep.left)
.run()

// Perform an explicit disconnect otherwise, if for example, we
// just completed the client connection, the session may receive
// the associated ConnectionLost signal for the new connection
// given that the new connection occurs so quickly.
client1Connection.offer(disconnectBytes)
client2Connection.offer(connectBytes)

disconnectReceived.future.futureValue shouldBe Done
connect2Received.future.futureValue shouldBe Done

val serverConnection2 =
Source
.queue[Command[Nothing]](1, OverflowStrategy.fail)
.via(
Mqtt
.serverSessionFlow(serverSession, ByteString.empty)
.join(pipeToClient2))
.wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
case Right(Event(`connect`, _)) =>
connect2Received.success(Done)
case Right(Event(cp: Subscribe, _)) if cp.topicFilters == subscribe.topicFilters =>
subscribe2Received.success(Done)
case Right(Event(_: PubAck, _)) =>
pubAckReceived.success(Done)
case other => fail(s"didn't match `$other`")
})
.toMat(Sink.seq)(Keep.left)
.run()

client2Connection.offer(connectBytes)

connect2Received.future.futureValue shouldBe Done

serverConnection2.offer(Command(connAck))
client2.expectMsg(6.seconds, connAckBytes)

client2Connection.offer(subscribeBytes)

subscribe2Received.future.futureValue shouldBe Done

serverConnection2.offer(Command(subAck))

client2.fishForMessage(3.seconds.dilated) {
case msg: ByteString if msg == dupPublishBytes => true
case _ => false
}
serverConnection2.offer(Command(connAck))
client2.expectMsg(6.seconds, connAckBytes)

client2Connection.offer(pubAckBytes)
pubAckReceived.future.futureValue shouldBe Done
client2Connection.offer(subscribeBytes)

client1Connection.complete()
client2Connection.complete()
serverConnection1.complete()
serverConnection2.complete()
subscribe2Received.future.futureValue shouldBe Done

for {
_ <- client1Connection.watchCompletion()
_ <- client2Connection.watchCompletion()
_ <- serverConnection1.watchCompletion()
_ <- serverConnection2.watchCompletion()
} serverSession.shutdown()
serverConnection2.offer(Command(subAck))

client2.fishForMessage(3.seconds.dilated) {
case msg: ByteString if msg == dupPublishBytes => true
case _ => false
}

client2Connection.offer(pubAckBytes)
pubAckReceived.future.futureValue shouldBe Done

client1Connection.complete()
client2Connection.complete()
serverConnection1.complete()
serverConnection2.complete()

for {
_ <- client1Connection.watchCompletion()
_ <- client2Connection.watchCompletion()
_ <- serverConnection1.watchCompletion()
_ <- serverConnection2.watchCompletion()
} serverSession.shutdown()

}
}

Expand Down
Loading