Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KTOR-7435 Add serialization for SSE #4363

Merged
merged 34 commits into from
Nov 15, 2024

Conversation

marychatte
Copy link
Member

Subsystem
Client SSE, Server SSE

Motivation
KTOR-7435

@marychatte marychatte self-assigned this Oct 2, 2024
@marychatte marychatte requested review from bjhham and e5l October 2, 2024 14:50
Copy link
Contributor

@bjhham bjhham left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the comments about including TypeInfo in the default config so that we can use different types in our sessions.

Comment on lines 34 to 40
public suspend fun <T : Any> HttpClient.serverSentEventsSession(
deserialize: ((String) -> T)? = null,
reconnectionTime: Duration? = null,
showCommentEvents: Boolean? = null,
showRetryEvents: Boolean? = null,
block: HttpRequestBuilder.() -> Unit
): ClientSSESession {
): ClientSSESession<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you consider using overloads instead of the default deserialize argument here to allow the original API sans type argument when using strings?

By that, I mean something like...

public suspend fun HttpClient.sseSession(
    reconnectionTime: Duration? = null,
    showCommentEvents: Boolean? = null,
    showRetryEvents: Boolean? = null,
    block: HttpRequestBuilder.() -> Unit
) = sseSession({ it }, reconnectionTime, showCommentEvents, showRetryEvents, block)

public suspend fun <T : Any> HttpClient.sseSession(
    deserialize: (String) -> T,
    reconnectionTime: Duration? = null,
    showCommentEvents: Boolean? = null,
    showRetryEvents: Boolean? = null,
    block: HttpRequestBuilder.() -> Unit
)

This would make it so the default deserialization can't be used, however.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe ignore this suggestion and just include typeInfo in the default lambda signature. We'll probably want some logic to ensure that when String is supplied as the type argument that we avoid any serialization, otherwise you'd end up with some weirdness with it expecting strings to be wrapped in quotes.

@@ -13,6 +13,7 @@ import kotlin.time.Duration.Companion.milliseconds
public class SSEConfig {
internal var showCommentEvents = false
internal var showRetryEvents = false
internal var deserialize: (String) -> Any = { s -> s }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we're missing the typeInfo here required to solve the general use-case for deserializing different types. I think it may need to be (TypeInfo) -> (String) -> Any so when using a StringFormat like Json, we can supply it like deserialize = { typeInfo -> Json.serializersModule.serializer(typeInfo).let { serializer -> {{ Json.decodeFromString(serializer, it) }} } }. Rather ugly, but we could provide some helpers somewhere.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the API side, this will need to use reified T and typeOf<T>() to pass to the session.

Comment on lines 552 to 542
client.sse<Customer>({
url("$TEST_SERVER/sse/json")
}) {
incoming.single().apply {
assertEquals(1, data?.id)
assertEquals("Jet", data?.firstName)
assertEquals("Brains", data?.lastName)
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would like to see this work with different types on the same server.

Comment on lines 18 to 24
public class ServerSentEvent<T>(
public val data: T? = null,
public val event: String? = null,
public val id: String? = null,
public val retry: Long? = null,
public val comments: String? = null
) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice if this was a data class, so instead of having:

incoming.collect { person ->
   println(person.data)
}

We could have

incoming.collect { (person) ->
    println(person)
}

@@ -12,16 +12,16 @@ import kotlinx.coroutines.flow.*
/**
* A Server-sent events session.
*/
public interface SSESession : CoroutineScope {
public interface SSESession<T> : CoroutineScope {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep SSESession without generic types and introduce a new type for the session with deserialization. It will be more explicit and simplify maintenance in the long run

@marychatte
Copy link
Member Author

@e5l @bjhham Thanks for the comments, please take a look at the new commits

Right now, for the client-side:

  1. Two types of sessions: SSESession and SSESessionWithDeserialization:
public interface SSESession : CoroutineScope {
    public val incoming: Flow<ServerSentEvent<String>>
}

public interface SSESessionWithDeserialization: SSESession {
    public val deserializer: (TypeInfo) -> (String) -> Any
}
  1. The difference in builder functions is in the parameter deserialize. This parameter is required for all builder functions for SSESessionWithDeserialization: deserialize: (TypeInfo) -> (String) -> Any
  2. Example of SSESessionWithDeserialization usage:
@Serializable
data class Customer(val id: Int, val firstName: String, val lastName: String)
@Serializable
data class Product(val name: String, val price: Int)

client.sse({
    url("$TEST_SERVER/sse/json")
}, deserialize = { typeInfo: TypeInfo ->
    { jsonString: String ->
         val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!)
         Json.decodeFromString(serializer, jsonString) ?: Exception()
     }
}) {
    incoming.collect { event ->
        if (customer) {
            val customer = deserialize<Customer>(event.data)
            assertEquals("Jet", customer?.firstName)
        } else {
           val product = deserialize<Product>(event.data)
           assertEquals(100, product?.price)
        }
   }
}

Example of SSESession usage:

val session = client.serverSentEventsSession("$TEST_SERVER/sse/hello")
val event: ServerSentEvent<String> = session.incoming.single()
assertEquals("0", event.id)

For the server-side:

  1. Also two types of sessions, SSESession and SSESessionWithSerialization:
public interface SSESession : CoroutineScope {
    public val call: ApplicationCall

    public suspend fun send(event: ServerSentEvent<String>)
}

public interface SSESessionWithSerialization : SSESession {
    public val serializer: (TypeInfo) -> (Any) -> String
}
public suspend inline fun <reified T : Any> SSESessionWithSerialization.sendSerialized(event: ServerSentEvent<T>) {
    send(
        ServerSentEvent(
            event.data?.let {
                serializer(typeInfo<T>()).invoke(it)
            },
            event.event,
            event.id,
            event.retry,
            event.comments
        )
    )
}
  1. Parameter serialize is required for all builder functions for SSESessionWithSerialization: serialize: (TypeInfo) -> (Any) -> String
  2. Example of SSESession usage:
sse("/hello") {
      send(ServerSentEvent("world", event = "send", id = "100", retry = 1000, comments = "comment"))
}

Example of SSESessionWithSerialization usage:

class Person1(val age: Int)
class Person2(val number: Int)
sse(serialize = {
    typeInfo ->
    { data ->
        when (typeInfo.type) {
            Person1::class -> {
                "Age ${(data as Person1).age}"
            }

            Person2::class -> {
                "Number ${(data as Person2).number}"
            }

            else -> {
                data.toString()
            }
        }
    }
}) {
    sendSerialized(Person1(22))
    sendSerialized(Person2(123456))
}

Questions:

  1. Is configuring the serialization/deserialization functions only through function parameters and not SSE config okay?
  2. Are we agreed on the types for serialization/deserialization functions deserialize: (TypeInfo) -> (String) -> Any and serialize: (TypeInfo) -> (Any) -> String?

@marychatte marychatte requested review from e5l and bjhham October 7, 2024 12:37
public class ServerSentEvent(
public val data: String? = null,
public data class ServerSentEvent<T>(
public val data: T? = null,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also split this class in 2 so String would be default?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I split into ServerSentEvent and ParameterizedServerSentEvent<T>

/**
* Deserializer for transforming field `data` of `ServerSentEvent` into desired data object.
*/
public val deserializer: (TypeInfo) -> (String) -> Any
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you tell me why it's not just fun deserialize(TypeInfo, String) -> Any?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added Deserializer and Serializer interface:

public interface Deserializer {
    public fun deserialize(typeInfo: TypeInfo, data: String): Any
}

Do we want to make them functional because now it's used like:

client.sse(
    { url("$TEST_SERVER/sse/person") },
    object : Deserializer {
        override fun deserialize(typeInfo: TypeInfo, data: String): Any {
            return Person1(data)
    }
}) {
    incoming.single().apply {
        assertEquals("Name 0", deserialize<Person1>(data)?.name)
    }
}

@@ -31,7 +32,7 @@ public data object SSECapability : HttpClientEngineCapability<Unit>
* val client = HttpClient {
* install(SSE)
* }
* client.sse {
* client.sse<String> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would avoid having generics on the sse method

@marychatte marychatte requested a review from e5l October 8, 2024 17:12
Comment on lines 90 to 98
public suspend inline fun <reified T : Any> SSESessionWithSerialization.sendSerialized(
data: T? = null,
event: String? = null,
id: String? = null,
retry: Long? = null,
comments: String? = null
) {
sendSerialized(ParameterizedServerSentEvent(data, event, id, retry, comments))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just overload send here to avoid the wordiness of sendSerialized.

Comment on lines 104 to 100
/**
* Serializer interface for transforming data object into field `data` of `ServerSentEvent`.
*/
public interface Serializer {

/**
* Transforms data object into field `data` of `ServerSentEvent`.
*/
public fun serialize(typeInfo: TypeInfo, data: Any): String
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just use the lambda expression (typeInfo, data) -> String for the serializer field, since the argument provides context and this interface clutters up the API.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@e5l suggested using an interface in case we would need to change the API or add something. But (typeInfo, data) -> String seems like final thing

Comment on lines 41 to 81
public data class ParameterizedServerSentEvent<T>(
public val data: T? = null,
public val event: String? = null,
public val id: String? = null,
public val retry: Long? = null,
public val comments: String? = null
) {
@InternalAPI
public fun toString(serializer: (T) -> String): String =
eventToString(data?.let { serializer(it) }, event, id, retry, comments)
}

private fun eventToString(data: String?, event: String?, id: String?, retry: Long?, comments: String?): String {
return buildString {
appendField("data", data)
appendField("event", event)
appendField("id", id)
appendField("retry", retry)
appendField("", comments)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd go with something like:

sealed interface ServerSentEventMetadata<T> {
    val data: T?
    val event: String?
    val id: String?
    val retry: Long?
    val comments: String?
}

data class ServerSentEvent(...): ServerSentEventMetadata<String>
data class ServerSentEventParsed(...): ServerSentEventMetadata<T>

Comment on lines 449 to 463
client.sse(
{
url("$TEST_SERVER/sse/person")
parameter("times", count)
},
deserialize = object : Deserializer {
override fun deserialize(typeInfo: TypeInfo, data: String): Any {
return Person(data)
}
}
) {
incoming.collectIndexed { i, event ->
val person = deserialize<Person>(event)
assertEquals("Name $i", person?.name)
assertEquals("$i", event.id)
size++
}
}
assertEquals(count, size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the removal of the default deserializer in the client config, I think it would simplify things to just go back to (String) -> E as the argument for the session and have the incoming return the parsed events from incoming.

Copy link
Member Author

@marychatte marychatte Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean have incoming: Flow<ServerSentEventParsed<*>>?

Comment on lines 236 to 252
@Test
fun testDifferentSerializers() = testApplication {
install(SSE)
routing {
sse(object : Serializer {
override fun serialize(typeInfo: TypeInfo, data: Any): String {
return when (typeInfo.type) {
Person1::class -> {
"Age ${(data as Person1).age}"
}

Person2::class -> {
"Number ${(data as Person2).number}"
}

else -> {
data.toString()
}
}
}
}) {
sendSerialized(Person1(22))
sendSerialized(Person2(123456))
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be interested in seeing an example with kotlinx-serialization-json, since this is probably the most common use-case for this feature. We ought to shape the API so that this is easy to implement.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is used kotlinx-serialization-json

@marychatte marychatte force-pushed the marychatte/KTOR-7435-Add-serialization-for-SSE branch from 9ce8ec5 to 19f01fe Compare October 10, 2024 17:15
/**
* An incoming server-sent events flow.
*/
public val incoming: Flow<ServerSentEventParsed<String>>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's think about naming. ServerSentEventParsed looks inconsistent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about TypedServerSentEvent?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

/**
* Deserializer for transforming field `data` of `ServerSentEvent` into desired data object.
*/
public val deserializer: (TypeInfo, String) -> Any
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we avoid functional fields?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@e5l It was an interface, but after #4363 (comment) I reverted to the functional field

@marychatte
Copy link
Member Author

@e5l @bjhham Please, take a look. Right now, there are two important things to discuss:

- Do we want to introduce functional fields or interfaces for Deserializer and Serializer?

With interfaces it may be easier to change the API in the future, but it consists only of one function, and I don't see potential ways for changing. And with functional fields it's easier for users to implement. For example:

  1. Interfaces
client.sse(
    { URL(...) },
    object : Deserializer {
        override fun deserialize(typeInfo: TypeInfo, data: String): Any {
            val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!)
            Json.decodeFromString(serializer, data) ?: Exception()
        } 
    }
) {
    ...
}
  1. Functional fields
client.sse(
    { URL(...) },
    { typeInfo, data ->
        val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!)
        Json.decodeFromString(serializer, data) ?: Exception()
    }
) {
    ...
}

I prefer the functional fields approach

- What type should incoming channel of SSESessionWithDeserialization have?

Suggested variant:

 public val incoming: Flow<TypedServerSentEvent<String>>

So during the session to get the typed event, we will need to call deserialize function:

client.sse({
    url(...)
}, deserialize = {
    typeInfo, jsonString ->
    val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!)
    Json.decodeFromString(serializer, jsonString) ?: Exception()
}) {
    incoming.collect { event: TypedServerSentEvent<String> ->
        when (event.event) {
            "customer" -> {
                val customer: Customer? = deserialize<Customer>(event.data)
                ...
            }
            "product" -> {
                val product: Product? = deserialize<Product>(event.data)
                ...
            }
        }
    }
}

@marychatte marychatte requested review from e5l and bjhham November 6, 2024 12:40
Copy link
Contributor

@bjhham bjhham left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍

It would be nice to have some bridge code for the JSON scenario where you don't have to worry about the deserialization args, though we don't really have a precedent for convenience libraries for specific plugins... maybe it could be introduced to the sample code in the registry, idk, food for thought.

Also, please rebase on 3.1 EAP

Copy link
Member

@e5l e5l left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide more descriptive API docs with usage examples, edge cases, and references

@marychatte marychatte changed the base branch from main to 3.1.0-eap November 6, 2024 15:26
@@ -6,11 +6,23 @@ package io.ktor.client.plugins.sse

import io.ktor.client.call.*
import io.ktor.sse.*
import io.ktor.util.reflect.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

/**
* A Server-sent events session.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider adding a reference on what SSE is on the wiki page

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it also would be great to highlight where the receiver type is used in the example

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated KDoc to:
A session for handling Server-Sent Events (SSE) from a server.

Example of usage:

client.sse("http://localhost:8080/sse") { // `this` is `ClientSSESession`
    incoming.collect { event ->
        println("Id: ${event.id}")
        println("Event: ${event.event}")
        println("Data: ${event.data}")
    }
}

To learn more, see the SSE and the SSE specification.

@e5l e5l requested a review from vnikolova November 7, 2024 07:53
@e5l
Copy link
Member

e5l commented Nov 7, 2024

@vnikolova, could you help us check KDocs?

Copy link
Contributor

@vnikolova vnikolova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The KDocs look clear - I made some minor suggestions.
I'm not sure if it's a common practice, but it might be useful to add comments in the code examples for added clarity.

@marychatte marychatte force-pushed the marychatte/KTOR-7435-Add-serialization-for-SSE branch from 5e30c20 to 2ee64f0 Compare November 7, 2024 12:11
marychatte and others added 25 commits November 14, 2024 13:10
…ktor/server/sse/Routing.kt

Co-authored-by:  Vik Nikolova <[email protected]>
…ktor/server/sse/Routing.kt

Co-authored-by:  Vik Nikolova <[email protected]>
…ktor/server/sse/SSESession.kt

Co-authored-by:  Vik Nikolova <[email protected]>
@marychatte marychatte force-pushed the marychatte/KTOR-7435-Add-serialization-for-SSE branch from 0502a51 to dbef334 Compare November 14, 2024 12:10
@marychatte marychatte merged commit 4016419 into 3.1.0-eap Nov 15, 2024
11 of 14 checks passed
@marychatte marychatte deleted the marychatte/KTOR-7435-Add-serialization-for-SSE branch November 15, 2024 12:28
osipxd pushed a commit that referenced this pull request Nov 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants