Skip to content

Commit

Permalink
Fix flaky tests
Browse files Browse the repository at this point in the history
  • Loading branch information
wkal-pubnub committed Aug 22, 2024
1 parent 5220bc2 commit 8294d6b
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ class ChannelIntegrationTest : BaseChatIntegrationTest() {
assertEquals(userName, userSuggestionsMembershipsFromCache.first().user.name)
}

// todo fix
// todo flaky
@Test
fun streamReadReceipts() = runTest(timeout = 10.seconds) {
val completableBeforeMark = CompletableDeferred<Unit>()
Expand All @@ -261,22 +261,27 @@ class ChannelIntegrationTest : BaseChatIntegrationTest() {
chat.markAllMessagesAsRead().await()

val tt = channel.sendText("text2").await().timetoken
val dispose = channel.streamReadReceipts { receipts ->
val lastRead = receipts.entries.find { it.value.contains(chat.currentUser.id) }?.key
if (lastRead != null) {
if (tt > lastRead) {
completableBeforeMark.complete(Unit) // before calling markAllMessagesRead
} else {
completableAfterMark.complete(Unit) // after calling markAllMessagesRead
var dispose: AutoCloseable? = null
pubnub.test(backgroundScope, checkAllEvents = false) {
pubnub.awaitSubscribe(listOf(channel.id)) {
dispose = channel.streamReadReceipts { receipts ->
val lastRead = receipts.entries.find { it.value.contains(chat.currentUser.id) }?.key
if (lastRead != null) {
if (tt > lastRead) {
completableBeforeMark.complete(Unit) // before calling markAllMessagesRead
} else {
completableAfterMark.complete(Unit) // after calling markAllMessagesRead
}
}
}
}
}

completableBeforeMark.await()
chat.markAllMessagesAsRead().await()
completableAfterMark.await()
completableBeforeMark.await()
chat.markAllMessagesAsRead().await()
completableAfterMark.await()

dispose.close()
dispose?.close()
}
}

@Test
Expand Down Expand Up @@ -440,32 +445,37 @@ class ChannelIntegrationTest : BaseChatIntegrationTest() {
val message = channel01.getMessage(timetoken).await()!!
val assertionErrorInCallback = CompletableDeferred<AssertionError?>()

val streamMessageReports = channel01.streamMessageReports { reportEvent: Event<EventContent.Report> ->
try {
// we need to have try/catch here because assertion error will not cause test to fail
numberOfReports.incrementAndGet()
val reportReason = reportEvent.payload.reason
assertTrue(reportReason == reason01 || reportReason == reason02)
assertEquals(messageText, reportEvent.payload.text)
assertTrue(reportEvent.payload.reportedMessageChannelId?.contains(INTERNAL_MODERATION_PREFIX)!!)
assertTrue(reportEvent.channelId.contains(INTERNAL_MODERATION_PREFIX))
if (numberOfReports.value == 2) {
assertionErrorInCallback.complete(null)
pubnub.test(backgroundScope, checkAllEvents = false) {
var streamMessageReportsCloseable: AutoCloseable? = null

pubnub.awaitSubscribe(listOf("PUBNUB_INTERNAL_MODERATION_${channel01.id}")) {
streamMessageReportsCloseable = channel01.streamMessageReports { reportEvent: Event<EventContent.Report> ->
try {
// we need to have try/catch here because assertion error will not cause test to fail
numberOfReports.incrementAndGet()
val reportReason = reportEvent.payload.reason
assertTrue(reportReason == reason01 || reportReason == reason02)
assertEquals(messageText, reportEvent.payload.text)
assertTrue(reportEvent.payload.reportedMessageChannelId?.contains(INTERNAL_MODERATION_PREFIX)!!)
assertTrue(reportEvent.channelId.contains(INTERNAL_MODERATION_PREFIX))
if (numberOfReports.value == 2) {
assertionErrorInCallback.complete(null)
}
} catch (e: AssertionError) {
assertionErrorInCallback.complete(e)
}
}
} catch (e: AssertionError) {
assertionErrorInCallback.complete(e)
}
}
delayInMillis(550)

// report messages
message.report(reason01).await()
message.report(reason02).await()
// report messages
message.report(reason01).await()
message.report(reason02).await()

assertionErrorInCallback.await()?.let { assertionError -> throw (assertionError) }
assertEquals(2, numberOfReports.value)
assertionErrorInCallback.await()?.let { assertionError -> throw (assertionError) }
assertEquals(2, numberOfReports.value)

streamMessageReports.close()
streamMessageReportsCloseable?.close()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,64 +158,78 @@ class ChatIntegrationTest : BaseChatIntegrationTest() {

// register lister of "Receipt" event
val assertionErrorInListener01 = CompletableDeferred<AssertionError?>()
val removeListenerAndUnsubscribe01: AutoCloseable = chat.listenForEvents<EventContent.Receipt>(
channelId = channelId01
) { event: Event<EventContent.Receipt> ->
try {
// we need to have try/catch here because assertion error will not cause test to fail
assertEquals(channelId01, event.channelId)
assertEquals(chat.currentUser.id, event.userId)
assertNotEquals(lastReadMessageTimetokenValue, event.payload.messageTimetoken)
assertEquals(lastPublishToChannel01.timetoken, event.payload.messageTimetoken)
assertionErrorInListener01.complete(null)
} catch (e: AssertionError) {
assertionErrorInListener01.complete(e)
}
}
val assertionErrorInListener02 = CompletableDeferred<AssertionError?>()
val removeListenerAndUnsubscribe02 = chat.listenForEvents(
type = EventContent.Receipt::class,
channelId = channelId02
) { event: Event<EventContent.Receipt> ->
try {
// we need to have try/catch here because assertion error will not cause test to fail
assertEquals(channelId02, event.channelId)
assertEquals(chat.currentUser.id, event.userId)
assertNotEquals(lastReadMessageTimetokenValue, event.payload.messageTimetoken)
assertEquals(lastPublishToChannel02.timetoken, event.payload.messageTimetoken)
assertionErrorInListener02.complete(null)
} catch (e: AssertionError) {
assertionErrorInListener02.complete(e)

var removeListenerAndUnsubscribe01: AutoCloseable? = null
var removeListenerAndUnsubscribe02: AutoCloseable? = null
pubnub.test(backgroundScope, checkAllEvents = false) {
pubnub.awaitSubscribe(listOf(channel01.id, channel02.id)) {
removeListenerAndUnsubscribe01 = chat.listenForEvents<EventContent.Receipt>(
channelId = channelId01
) { event: Event<EventContent.Receipt> ->
try {
// we need to have try/catch here because assertion error will not cause test to fail
assertEquals(channelId01, event.channelId)
assertEquals(chat.currentUser.id, event.userId)
assertNotEquals(lastReadMessageTimetokenValue, event.payload.messageTimetoken)
assertEquals(lastPublishToChannel01.timetoken, event.payload.messageTimetoken)
assertionErrorInListener01.complete(null)
} catch (e: AssertionError) {
assertionErrorInListener01.complete(e)
}
}

removeListenerAndUnsubscribe02 = chat.listenForEvents(
type = EventContent.Receipt::class,
channelId = channelId02
) { event: Event<EventContent.Receipt> ->
try {
// we need to have try/catch here because assertion error will not cause test to fail
assertEquals(channelId02, event.channelId)
assertEquals(chat.currentUser.id, event.userId)
assertNotEquals(lastReadMessageTimetokenValue, event.payload.messageTimetoken)
assertEquals(lastPublishToChannel02.timetoken, event.payload.messageTimetoken)
assertionErrorInListener02.complete(null)
} catch (e: AssertionError) {
assertionErrorInListener02.complete(e)
}
}
}
}

// then
val markAllMessageAsReadResponse: MarkAllMessageAsReadResponse = chat.markAllMessagesAsRead().await()
// then
val markAllMessageAsReadResponse: MarkAllMessageAsReadResponse = chat.markAllMessagesAsRead().await()

// verify response contains updated "lastReadMessageTimetoken"
markAllMessageAsReadResponse.memberships.forEach { membership: Membership ->
// why membership.custom!!["lastReadMessageTimetoken"] returns double? <--this is default behaviour of GSON
assertNotEquals(lastReadMessageTimetokenValue, membership.custom!!["lastReadMessageTimetoken"].tryLong())
}
// verify response contains updated "lastReadMessageTimetoken"
markAllMessageAsReadResponse.memberships.forEach { membership: Membership ->
// why membership.custom!!["lastReadMessageTimetoken"] returns double? <--this is default behaviour of GSON
assertNotEquals(
lastReadMessageTimetokenValue,
membership.custom!!["lastReadMessageTimetoken"].tryLong()
)
}

// verify each Membership has updated custom value for "lastReadMessageTimetoken"
val userMembership: MembershipsResponse = chat.currentUser.getMemberships().await()
userMembership.memberships.forEach { membership: Membership ->
assertNotEquals(lastReadMessageTimetokenValue, membership.custom!!["lastReadMessageTimetoken"].tryLong())
}
// verify each Membership has updated custom value for "lastReadMessageTimetoken"
val userMembership: MembershipsResponse = chat.currentUser.getMemberships().await()
userMembership.memberships.forEach { membership: Membership ->
assertNotEquals(
lastReadMessageTimetokenValue,
membership.custom!!["lastReadMessageTimetoken"].tryLong()
)
}

// verify assertion inside listeners
assertionErrorInListener01.await()?.let { throw it }
assertionErrorInListener02.await()?.let { throw it }
// verify assertion inside listeners
assertionErrorInListener01.await()?.let { throw it }
assertionErrorInListener02.await()?.let { throw it }

// remove messages
chat.pubNub.deleteMessages(listOf(channelId01, channelId02))
// remove messages
chat.pubNub.deleteMessages(listOf(channelId01, channelId02))

// remove listeners and unsubscribe
removeListenerAndUnsubscribe01.close()
removeListenerAndUnsubscribe02.close()
// remove listeners and unsubscribe
removeListenerAndUnsubscribe01?.close()
removeListenerAndUnsubscribe02?.close()

// remove memberships (user). This will be done in tearDown method
// remove memberships (user). This will be done in tearDown method
}
}

@Ignore // fails from time to time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,34 +99,37 @@ class MessageIntegrationTest : BaseChatIntegrationTest() {
val reason = "rude"
val assertionErrorInListener01 = CompletableDeferred<AssertionError?>()
val channelId = "$INTERNAL_MODERATION_PREFIX${channel01.id}"
val removeListenerAndUnsubscribe: AutoCloseable = chat.listenForEvents<EventContent.Report>(
channelId = channelId,
callback = { event: Event<EventContent.Report> ->
println("-= in listenForEvents")
try {
// we need to have try/catch here because assertion error will not cause test to fail
assertEquals(reason, event.payload.reason)
assertEquals(channelId, event.payload.reportedMessageChannelId)
assertEquals(channelId, event.channelId)
assertEquals(someUser.id, event.payload.reportedUserId)
assertEquals(timetoken, event.payload.reportedMessageTimetoken)
assertionErrorInListener01.complete(null)
} catch (e: AssertionError) {
assertionErrorInListener01.complete(e)
}
pubnub.test(backgroundScope, checkAllEvents = false) {
var removeListenerAndUnsubscribe: AutoCloseable? = null
pubnub.awaitSubscribe(listOf(channelId)) {
removeListenerAndUnsubscribe = chat.listenForEvents<EventContent.Report>(
channelId = channelId,
callback = { event: Event<EventContent.Report> ->
println("-= in listenForEvents")
try {
// we need to have try/catch here because assertion error will not cause test to fail
assertEquals(reason, event.payload.reason)
assertEquals(channelId, event.payload.reportedMessageChannelId)
assertEquals(channelId, event.channelId)
assertEquals(someUser.id, event.payload.reportedUserId)
assertEquals(timetoken, event.payload.reportedMessageTimetoken)
assertionErrorInListener01.complete(null)
} catch (e: AssertionError) {
assertionErrorInListener01.complete(e)
}
}
)
}
)
delayInMillis(150)

// when
val message: Message = channel01.getMessage(timetoken).await()!!
message.report(reason).await()
// when
val message: Message = channel01.getMessage(timetoken).await()!!
message.report(reason).await()

// then
assertionErrorInListener01.await()?.let { throw it }
// then
assertionErrorInListener01.await()?.let { throw it }

// cleanup
removeListenerAndUnsubscribe.close()
// cleanup
removeListenerAndUnsubscribe?.close()
}
}

private fun getDeletedActionMap() = mapOf(
Expand Down

0 comments on commit 8294d6b

Please sign in to comment.