Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parent event id for echo #3

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# WebSocket Client v0.2.2
# WebSocket Client v0.2.3

This microservice allows sending and receiving messages via WebSocket protocol

Expand Down Expand Up @@ -69,7 +69,7 @@ metadata:
name: ws-client
spec:
image-name: ghcr.io/th2-net/th2-conn-ws-client
image-version: 0.2.1
image-version: 0.2.3
custom-config:
uri: wss://echo.websocket.org
sessionAlias: api_session
Expand Down Expand Up @@ -101,6 +101,12 @@ spec:

## Changelog

### v0.2.3

#### Added

* retain original `parentEventId` when storing an outgoing message

### v0.2.2

#### Added:
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ clean {
dependencies {
api platform('com.exactpro.th2:bom:3.0.0')

implementation 'com.exactpro.th2:common:3.13.4'
implementation 'com.exactpro.th2:common:3.18.2'
implementation 'com.exactpro.th2:grpc-conn:0.0.1'

implementation 'org.slf4j:slf4j-log4j12'
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
kotlin.code.style=official
kotlin_version=1.4.10
release_version=0.2.2
release_version=0.2.3
description='Websocket Client'
vcs_url=https://github.com/th2-net/th2-conn-ws-client
14 changes: 8 additions & 6 deletions src/main/kotlin/com/exactpro/th2/ws/client/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package com.exactpro.th2.ws.client

import com.exactpro.th2.common.event.Event
import com.exactpro.th2.common.event.EventUtils
import com.exactpro.th2.common.grpc.ConnectionID
import com.exactpro.th2.common.grpc.Direction
import com.exactpro.th2.common.grpc.EventBatch
import com.exactpro.th2.common.grpc.EventID
import com.exactpro.th2.common.grpc.MessageGroupBatch
import com.exactpro.th2.common.schema.factory.CommonFactory
import com.exactpro.th2.common.schema.grpc.router.GrpcRouter
Expand Down Expand Up @@ -122,10 +124,10 @@ fun run(
val outgoingSequence = createSequence()

//TODO: add batching (by size or time)
val onMessage = { message: ByteArray, _: Boolean, direction: Direction ->
val onMessage = { message: ByteArray, textual: Boolean, direction: Direction, eventId: EventID? ->
val sequence = if (direction == Direction.FIRST) incomingSequence else outgoingSequence
val attribute = if (direction == Direction.FIRST) QueueAttribute.FIRST else QueueAttribute.SECOND
messageRouter.send(message.toBatch(connectionId, direction, sequence()), attribute.toString())
messageRouter.send(message.toBatch(connectionId, direction, sequence(), eventId), attribute.toString())
}

val onEvent = { cause: Throwable?, message: () -> String ->
Expand All @@ -152,7 +154,7 @@ fun run(
require(messagesCount == 1) { "Message group contains more than 1 message" }
val message = messagesList[0]
require(message.hasRawMessage()) { "Message in the group is not a raw message" }
settings.frameType.send(client, message.rawMessage.body.toByteArray())
settings.frameType.send(client, message.rawMessage.body.toByteArray(), message.rawMessage.parentEventId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message.rawMessage.parentEventId

It'll always be non-null. We should check its presence through hasParentEventId method instead and pass null if it returns false

}.recoverCatching {
LOGGER.error(it) { "Failed to handle message group: ${group.toPrettyString()}" }
eventRouter.storeEvent(rootEventId, "Failed to handle message group: ${group.toPrettyString()}", "Error", it)
Expand Down Expand Up @@ -194,13 +196,13 @@ data class Settings(
) {
enum class FrameType {
TEXT {
override fun send(client: IClient, data: ByteArray) = client.sendText(data.toString(UTF_8))
override fun send(client: IClient, data: ByteArray, eventId: EventID?) = client.sendText(data.toString(UTF_8), eventId)
},
BINARY {
override fun send(client: IClient, data: ByteArray) = client.sendBinary(data)
override fun send(client: IClient, data: ByteArray, eventId: EventID?) = client.sendBinary(data, eventId)
};

abstract fun send(client: IClient, data: ByteArray)
abstract fun send(client: IClient, data: ByteArray, eventId: EventID?)
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/main/kotlin/com/exactpro/th2/ws/client/api/IClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package com.exactpro.th2.ws.client.api

import com.exactpro.th2.common.grpc.EventID

interface IClient {
fun sendText(text: String)
fun sendBinary(data: ByteArray)
fun sendText(text: String, eventId: EventID? = null)
fun sendBinary(data: ByteArray, eventId: EventID? = null)
fun sendPing(message: ByteArray)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.exactpro.th2.ws.client.api.impl
import com.exactpro.th2.common.grpc.Direction
import com.exactpro.th2.common.grpc.Direction.FIRST
import com.exactpro.th2.common.grpc.Direction.SECOND
import com.exactpro.th2.common.grpc.EventID
import com.exactpro.th2.ws.client.api.IClient
import com.exactpro.th2.ws.client.api.IClientSettings
import com.exactpro.th2.ws.client.api.IHandler
Expand All @@ -35,7 +36,7 @@ import kotlin.concurrent.withLock
class WebSocketClient(
private val uri: URI,
private val handler: IHandler,
private val onMessage: (message: ByteArray, textual: Boolean, direction: Direction) -> Unit,
private val onMessage: (message: ByteArray, textual: Boolean, direction: Direction, eventId: EventID?) -> Unit,
private val onEvent: (cause: Throwable?, message: () -> String) -> Unit
) : IClient, WebSocket.Listener {
private val logger = KotlinLogging.logger {}
Expand All @@ -60,18 +61,18 @@ class WebSocketClient(
return socket
}

override fun sendText(text: String) {
override fun sendText(text: String, eventId: EventID?) {
logger.debug { "Sending text: $text" }
val preparedText = handler.prepareText(this, text)
awaitSocket().sendText(preparedText, true)
onMessage(preparedText.toByteArray(), true, SECOND)
onMessage(preparedText.toByteArray(), true, SECOND, eventId)
}

override fun sendBinary(data: ByteArray) {
override fun sendBinary(data: ByteArray, eventId: EventID?) {
logger.debug { "Sending binary: ${data.toBase64()}" }
val preparedData = handler.prepareBinary(this, data)
awaitSocket().sendBinary(ByteBuffer.wrap(preparedData), true)
onMessage(preparedData, false, SECOND)
onMessage(preparedData, false, SECOND, eventId)
}

override fun sendPing(message: ByteArray) {
Expand Down Expand Up @@ -100,7 +101,7 @@ class WebSocketClient(
if (last) {
val message = if (textFrames.isEmpty()) frame else textFrames.joinToString("")
handler.onText(this, message)
onMessage(message.toByteArray(), true, FIRST)
onMessage(message.toByteArray(), true, FIRST, null)
textFrames.clear()
}

Expand All @@ -121,7 +122,7 @@ class WebSocketClient(
if (last) {
val message = if (binaryFrames.isEmpty()) frame else binaryFrames.reduce(ByteArray::plus)
handler.onBinary(this, message)
onMessage(message, false, FIRST)
onMessage(message, true, FIRST, null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
onMessage(message, true, FIRST, null)
onMessage(message, false, FIRST, null)

binaryFrames.clear()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package com.exactpro.th2.ws.client.util
import com.exactpro.th2.common.grpc.AnyMessage
import com.exactpro.th2.common.grpc.ConnectionID
import com.exactpro.th2.common.grpc.Direction
import com.exactpro.th2.common.grpc.EventID
import com.exactpro.th2.common.grpc.MessageGroup
import com.exactpro.th2.common.grpc.MessageGroupBatch
import com.exactpro.th2.common.grpc.RawMessage
Expand All @@ -44,8 +45,10 @@ fun ByteArray.toBatch(
connectionId: ConnectionID,
direction: Direction,
sequence: Long,
parentEventId: EventID?
): MessageGroupBatch = RawMessage.newBuilder().apply {
this.body = ByteString.copyFrom(this@toBatch)
parentEventId?.let { this.parentEventId = it }
this.metadataBuilder {
this.timestamp = Instant.now().toTimestamp()
this.idBuilder {
Expand Down