Skip to content

Commit

Permalink
Mc/restore (#56)
Browse files Browse the repository at this point in the history
* Added message restore method.

* Added Chat.init test
---------
Co-authored-by: Wojtek Kaliciński <[email protected]>
  • Loading branch information
marcin-cebo authored Sep 3, 2024
1 parent 8941fef commit d44d70f
Show file tree
Hide file tree
Showing 17 changed files with 292 additions and 46 deletions.
7 changes: 7 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ kotlin {
implementation("com.pubnub:pubnub-kotlin-impl:9.2-DEV")
}
}

val commonTest by getting {
dependencies {
implementation(kotlin("test"))
implementation("com.pubnub:pubnub-kotlin-test")
}
}
}

if (enableAnyIosTarget) {
Expand Down
1 change: 1 addition & 0 deletions pubnub-chat-api/api/pubnub-chat-api.api
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ public abstract interface class com/pubnub/chat/Message {
public abstract fun pin ()Lcom/pubnub/kmp/PNFuture;
public abstract fun removeThread ()Lcom/pubnub/kmp/PNFuture;
public abstract fun report (Ljava/lang/String;)Lcom/pubnub/kmp/PNFuture;
public abstract fun restore ()Lcom/pubnub/kmp/PNFuture;
public abstract fun streamUpdates (Lkotlin/jvm/functions/Function1;)Ljava/lang/AutoCloseable;
public abstract fun toggleReaction (Ljava/lang/String;)Lcom/pubnub/kmp/PNFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,7 @@ interface Message {
// todo do we want to have test for this?
fun <T : Message> streamUpdates(callback: (message: T) -> Unit): AutoCloseable

fun restore(): PNFuture<Message>

companion object
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.pubnub.api.models.consumer.PNBoundedPage
import com.pubnub.api.models.consumer.PNPublishResult
import com.pubnub.api.models.consumer.history.PNFetchMessageItem
import com.pubnub.api.models.consumer.history.PNFetchMessagesResult
import com.pubnub.api.models.consumer.message_actions.PNMessageAction
import com.pubnub.api.models.consumer.message_actions.PNRemoveMessageActionResult
import com.pubnub.api.models.consumer.objects.PNKey
import com.pubnub.api.models.consumer.objects.PNMembershipKey
Expand Down Expand Up @@ -72,6 +73,7 @@ import com.pubnub.chat.internal.error.PubNubErrorMessage.THERE_IS_NO_ACTION_TIME
import com.pubnub.chat.internal.error.PubNubErrorMessage.THERE_IS_NO_THREAD_TO_BE_DELETED
import com.pubnub.chat.internal.error.PubNubErrorMessage.THERE_IS_NO_THREAD_WITH_ID
import com.pubnub.chat.internal.error.PubNubErrorMessage.THIS_MESSAGE_IS_NOT_A_THREAD
import com.pubnub.chat.internal.error.PubNubErrorMessage.THIS_THREAD_ID_ALREADY_RESTORED
import com.pubnub.chat.internal.error.PubNubErrorMessage.THREAD_FOR_THIS_MESSAGE_ALREADY_EXISTS
import com.pubnub.chat.internal.error.PubNubErrorMessage.USER_ID_ALREADY_EXIST
import com.pubnub.chat.internal.error.PubNubErrorMessage.USER_NOT_EXIST
Expand Down Expand Up @@ -182,6 +184,27 @@ class ChatImpl(
type = user.type
)

override fun restoreThreadChannel(message: Message): PNFuture<PNMessageAction?> {
val threadChannelId = getThreadId(message.channelId, message.timetoken)
return getChannel(threadChannelId).thenAsync { channel: Channel? ->
if (channel == null) {
null.asFuture()
} else {
if (message.actions?.get(THREAD_ROOT_ID)?.get(threadChannelId)?.isNotEmpty() == true) {
log.pnError(THIS_THREAD_ID_ALREADY_RESTORED)
}

val messageAction = PNMessageAction(
type = THREAD_ROOT_ID,
value = threadChannelId,
messageTimetoken = message.timetoken
)
pubNub.addMessageAction(channel = message.channelId, messageAction = messageAction)
// we don't update action map here but we do this in message#restore()
}
}
}

override fun createUser(
id: String,
name: String?,
Expand All @@ -205,6 +228,34 @@ class ChatImpl(
}
}

override fun removeThreadChannel(
chat: Chat,
message: Message,
soft: Boolean
): PNFuture<Pair<PNRemoveMessageActionResult, Channel>> {
if (!message.hasThread) {
return PubNubException(THERE_IS_NO_THREAD_TO_BE_DELETED).logErrorAndReturnException(log).asFuture()
}

val threadId = getThreadId(message.channelId, message.timetoken)

val actionTimetoken =
message.actions?.get(THREAD_ROOT_ID)?.get(threadId)?.get(0)?.actionTimetoken
?: return PubNubException(THERE_IS_NO_ACTION_TIMETOKEN_CORRESPONDING_TO_THE_THREAD).logErrorAndReturnException(
log
).asFuture()

return chat.getChannel(threadId).thenAsync { threadChannel ->
if (threadChannel == null) {
log.pnError("$THERE_IS_NO_THREAD_WITH_ID$threadId")
}
awaitAll(
chat.pubNub.removeMessageAction(message.channelId, message.timetoken, actionTimetoken),
threadChannel.delete(soft)
)
}
}

override fun getUser(userId: String): PNFuture<User?> {
if (!isValidId(userId)) {
return log.logErrorAndReturnException(ID_IS_REQUIRED).asFuture()
Expand Down Expand Up @@ -1139,34 +1190,6 @@ class ChatImpl(
).asFuture()
}
}

internal fun removeThreadChannel(
chat: Chat,
message: Message,
soft: Boolean = false
): PNFuture<Pair<PNRemoveMessageActionResult, Channel>> {
if (!message.hasThread) {
return PubNubException(THERE_IS_NO_THREAD_TO_BE_DELETED).logErrorAndReturnException(log).asFuture()
}

val threadId = getThreadId(message.channelId, message.timetoken)

val actionTimetoken =
message.actions?.get("threadRootId")?.get(threadId)?.get(0)?.actionTimetoken
?: return PubNubException(THERE_IS_NO_ACTION_TIMETOKEN_CORRESPONDING_TO_THE_THREAD).logErrorAndReturnException(
log
).asFuture()

return chat.getChannel(threadId).thenAsync { threadChannel ->
if (threadChannel == null) {
log.pnError("$THERE_IS_NO_THREAD_WITH_ID$threadId")
}
awaitAll(
chat.pubNub.removeMessageAction(message.channelId, message.timetoken, actionTimetoken),
threadChannel.delete(soft)
)
}
}
}

private fun storeUserActivityTimestamp(): PNFuture<Unit> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package com.pubnub.chat.internal

import com.pubnub.api.models.consumer.message_actions.PNMessageAction
import com.pubnub.api.models.consumer.message_actions.PNRemoveMessageActionResult
import com.pubnub.chat.Channel
import com.pubnub.chat.Chat
import com.pubnub.chat.Message
import com.pubnub.chat.User
import com.pubnub.kmp.PNFuture

Expand All @@ -9,4 +13,12 @@ interface ChatInternal : Chat {
val deleteMessageActionName: String

fun createUser(user: User): PNFuture<User>

fun removeThreadChannel(
chat: Chat,
message: Message,
soft: Boolean = false
): PNFuture<Pair<PNRemoveMessageActionResult, Channel>>

fun restoreThreadChannel(message: Message): PNFuture<PNMessageAction?>
}
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,7 @@ abstract class BaseChannel<C : Channel, M : Message>(
val newChannel = previousChannel?.plus(message.data) ?: ChannelImpl.fromDTO(chat, message.data)
newChannel to newChannelId
}

is PNDeleteChannelMetadataEventMessage -> null to message.channel
else -> return@createEventListener
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.pubnub.chat.ThreadMessage
import com.pubnub.chat.internal.ChatImpl
import com.pubnub.chat.internal.ChatInternal
import com.pubnub.chat.internal.DELETED
import com.pubnub.chat.internal.THREAD_ROOT_ID
import com.pubnub.chat.internal.error.PubNubErrorMessage.PARENT_CHANNEL_DOES_NOT_EXISTS
import com.pubnub.chat.internal.message.ThreadMessageImpl
import com.pubnub.chat.internal.util.pnError
Expand Down Expand Up @@ -74,6 +75,10 @@ data class ThreadChannelImpl(
}
}

override fun delete(soft: Boolean): PNFuture<Channel> {
return chat.removeThreadChannel(chat, parentMessage, soft).then { it.second }
}

override fun sendText(
text: String,
meta: Map<String, Any>?,
Expand All @@ -93,7 +98,7 @@ data class ThreadChannelImpl(
chat.pubNub.addMessageAction(
parentMessage.channelId,
PNMessageAction(
"threadRootId",
THREAD_ROOT_ID,
id,
parentMessage.timetoken
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,6 @@ internal object PubNubErrorMessage {
internal const val THREAD_FOR_THIS_MESSAGE_ALREADY_EXISTS = "Thread for this message already exists."
internal const val RECEIPT_EVENT_WAS_NOT_SENT_TO_CHANNEL = "Because PAM did not allow it 'receipt' event was not sent to channel: "
internal const val ERROR_HANDLING_ONMESSAGE_EVENT = "Error handling onMessage event"
internal const val THIS_MESSAGE_HAS_NOT_BEEN_DELETED = "This message has not been deleted"
internal const val THIS_THREAD_ID_ALREADY_RESTORED = "This thread is already restored"
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package com.pubnub.chat.internal.message

import com.pubnub.api.JsonElement
import com.pubnub.api.PubNubException
import com.pubnub.api.asMap
import com.pubnub.api.endpoints.message_actions.RemoveMessageAction
import com.pubnub.api.models.consumer.PNBoundedPage
import com.pubnub.api.models.consumer.PNPublishResult
import com.pubnub.api.models.consumer.history.PNFetchMessageItem
import com.pubnub.api.models.consumer.message_actions.PNAddMessageActionResult
import com.pubnub.api.models.consumer.message_actions.PNGetMessageActionsResult
import com.pubnub.api.models.consumer.message_actions.PNMessageAction
import com.pubnub.chat.Channel
import com.pubnub.chat.Message
Expand All @@ -20,10 +24,13 @@ import com.pubnub.chat.internal.THREAD_ROOT_ID
import com.pubnub.chat.internal.channel.ChannelImpl
import com.pubnub.chat.internal.error.PubNubErrorMessage
import com.pubnub.chat.internal.error.PubNubErrorMessage.CANNOT_STREAM_MESSAGE_UPDATES_ON_EMPTY_LIST
import com.pubnub.chat.internal.error.PubNubErrorMessage.THIS_MESSAGE_HAS_NOT_BEEN_DELETED
import com.pubnub.chat.internal.serialization.PNDataEncoder
import com.pubnub.chat.internal.util.logWarnAndReturnException
import com.pubnub.chat.internal.util.pnError
import com.pubnub.chat.types.EventContent
import com.pubnub.chat.types.File
import com.pubnub.chat.types.MessageActionType
import com.pubnub.chat.types.MessageMentionedUsers
import com.pubnub.chat.types.MessageReferencedChannels
import com.pubnub.chat.types.QuotedMessage
Expand Down Expand Up @@ -56,7 +63,7 @@ abstract class BaseMessage<T : Message>(
override val text: String
get() {
val edits = actions?.get(chat.editMessageActionName) ?: return content.text
val flatEdits = edits.mapValues { it.value.first() }
val flatEdits = edits.filterValues { it.isNotEmpty() }.mapValues { it.value.first() }
val lastEdit = flatEdits.entries.reduce { acc, entry ->
if (acc.value.actionTimetoken > entry.value.actionTimetoken) {
acc
Expand All @@ -68,7 +75,7 @@ abstract class BaseMessage<T : Message>(
}

override val deleted: Boolean
get() = actions?.get(chat.deleteMessageActionName)?.get(chat.deleteMessageActionName)?.isNotEmpty() ?: false
get() = getDeleteActions() != null

override val hasThread: Boolean
get() {
Expand All @@ -84,7 +91,8 @@ abstract class BaseMessage<T : Message>(
override val files: List<File>
get() = content.files ?: emptyList()

override val reactions get() = actions?.get(com.pubnub.chat.types.MessageActionType.REACTIONS.toString()) ?: emptyMap()
override val reactions: Map<String, List<PNFetchMessageItem.Action>>
get() = actions?.get(MessageActionType.REACTIONS.toString()) ?: emptyMap()

override val textLinks: List<TextLink>? get() = (
meta?.get(
Expand Down Expand Up @@ -188,13 +196,14 @@ abstract class BaseMessage<T : Message>(

override fun createThread(): PNFuture<ThreadChannel> = ChatImpl.createThreadChannel(chat, this)

override fun removeThread() = ChatImpl.removeThreadChannel(chat, this)
override fun removeThread() = chat.removeThreadChannel(chat, this)

override fun toggleReaction(reaction: String): PNFuture<Message> {
val existingReaction = reactions[reaction]?.find {
it.uuid == chat.currentUser.id
}
val messageAction = PNMessageAction(com.pubnub.chat.types.MessageActionType.REACTIONS.toString(), reaction, timetoken)
val messageAction =
PNMessageAction(MessageActionType.REACTIONS.toString(), reaction, timetoken)
val newActions = if (existingReaction != null) {
chat.pubNub.removeMessageAction(channelId, timetoken, existingReaction.actionTimetoken.toLong())
.then { filterAction(actions, messageAction) }
Expand All @@ -205,7 +214,57 @@ abstract class BaseMessage<T : Message>(
return newActions.then { copyWithActions(it) }
}

override fun <M : Message> streamUpdates(callback: (message: M) -> Unit): AutoCloseable {
return streamUpdatesOn(listOf(this as M)) {
callback(it.first())
}
}

override fun restore(): PNFuture<Message> {
val deleteActions: List<PNFetchMessageItem.Action> = getDeleteActions()
?: return PubNubException(THIS_MESSAGE_HAS_NOT_BEEN_DELETED).logWarnAndReturnException(log).asFuture()

var updatedActions: Actions? = actions?.filterNot {
it.key == chat.deleteMessageActionName
}

return deleteActions
.map { removeMessageAction(it.actionTimetoken) }
.awaitAll()
.thenAsync {
// get messageAction for all messages in channel
chat.pubNub.getMessageActions(channel = channelId, page = PNBoundedPage(end = timetoken))
}.then { pnGetMessageActionsResult: PNGetMessageActionsResult ->
// getMessageAction assigned to this message
val messageActionsForMessage = pnGetMessageActionsResult.actions.filter { it.messageTimetoken == timetoken }

// update actions map
messageActionsForMessage.forEach { pnMessageAction ->
updatedActions = assignAction(updatedActions, pnMessageAction)
}
}.thenAsync {
chat.restoreThreadChannel(this)
}.then { pnMessageAction: PNMessageAction? ->
// update actions map
pnMessageAction?.let { updatedActions = assignAction(updatedActions, it) }
copyWithActions(updatedActions)
}
}

private fun removeMessageAction(deleteActionTimetoken: Long): RemoveMessageAction {
return chat.pubNub.removeMessageAction(
channel = channelId,
messageTimetoken = timetoken,
actionTimetoken = deleteActionTimetoken
)
}

private fun getDeleteActions(): List<PNFetchMessageItem.Action>? {
return actions?.get(chat.deleteMessageActionName)?.get(chat.deleteMessageActionName)
}

private fun deleteThread(soft: Boolean): PNFuture<Unit> {
// todo check on server, discuss with Team
if (hasThread) {
return getThread().thenAsync {
it.delete(soft)
Expand All @@ -222,13 +281,7 @@ abstract class BaseMessage<T : Message>(
)
}

internal abstract fun copyWithActions(actions: Actions): T

override fun <M : Message> streamUpdates(callback: (message: M) -> Unit): AutoCloseable {
return streamUpdatesOn(listOf(this as M)) {
callback(it.first())
}
}
internal abstract fun copyWithActions(actions: Actions?): T

companion object {
private val log = logging()
Expand All @@ -244,7 +297,8 @@ abstract class BaseMessage<T : Message>(
val chat = messages.first().chat
val listener = createEventListener(chat.pubNub, onMessageAction = { _, event ->
val message =
latestMessages.find { it.timetoken == event.messageAction.messageTimetoken } ?: return@createEventListener
latestMessages.find { it.timetoken == event.messageAction.messageTimetoken }
?: return@createEventListener
if (message.channelId != event.channel) {
return@createEventListener
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ data class MessageImpl(
referencedChannels = referencedChannels,
quotedMessage = quotedMessage
) {
override fun copyWithActions(actions: Actions): Message = copy(actions = actions)
override fun copyWithActions(actions: Actions?): Message = copy(actions = actions)

companion object {
internal fun fromDTO(chat: ChatInternal, pnMessageResult: PNMessageResult): Message {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ data class ThreadMessageImpl(
quotedMessage = quotedMessage
),
ThreadMessage {
override fun copyWithActions(actions: Actions): ThreadMessage = copy(actions = actions)
override fun copyWithActions(actions: Actions?): ThreadMessage = copy(actions = actions)

companion object {
private val log = logging()
Expand Down
Loading

0 comments on commit d44d70f

Please sign in to comment.