-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KTOR-7435 Add serialization for SSE #4363
KTOR-7435 Add serialization for SSE #4363
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check the comments about including TypeInfo
in the default config so that we can use different types in our sessions.
public suspend fun <T : Any> HttpClient.serverSentEventsSession( | ||
deserialize: ((String) -> T)? = null, | ||
reconnectionTime: Duration? = null, | ||
showCommentEvents: Boolean? = null, | ||
showRetryEvents: Boolean? = null, | ||
block: HttpRequestBuilder.() -> Unit | ||
): ClientSSESession { | ||
): ClientSSESession<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you consider using overloads instead of the default deserialize argument here to allow the original API sans type argument when using strings?
By that, I mean something like...
public suspend fun HttpClient.sseSession(
reconnectionTime: Duration? = null,
showCommentEvents: Boolean? = null,
showRetryEvents: Boolean? = null,
block: HttpRequestBuilder.() -> Unit
) = sseSession({ it }, reconnectionTime, showCommentEvents, showRetryEvents, block)
public suspend fun <T : Any> HttpClient.sseSession(
deserialize: (String) -> T,
reconnectionTime: Duration? = null,
showCommentEvents: Boolean? = null,
showRetryEvents: Boolean? = null,
block: HttpRequestBuilder.() -> Unit
)
This would make it so the default deserialization can't be used, however.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think maybe ignore this suggestion and just include typeInfo
in the default lambda signature. We'll probably want some logic to ensure that when String
is supplied as the type argument that we avoid any serialization, otherwise you'd end up with some weirdness with it expecting strings to be wrapped in quotes.
@@ -13,6 +13,7 @@ import kotlin.time.Duration.Companion.milliseconds | |||
public class SSEConfig { | |||
internal var showCommentEvents = false | |||
internal var showRetryEvents = false | |||
internal var deserialize: (String) -> Any = { s -> s } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we're missing the typeInfo here required to solve the general use-case for deserializing different types. I think it may need to be (TypeInfo) -> (String) -> Any
so when using a StringFormat
like Json
, we can supply it like deserialize = { typeInfo -> Json.serializersModule.serializer(typeInfo).let { serializer -> {{ Json.decodeFromString(serializer, it) }} } }
. Rather ugly, but we could provide some helpers somewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the API side, this will need to use reified T
and typeOf<T>()
to pass to the session.
client.sse<Customer>({ | ||
url("$TEST_SERVER/sse/json") | ||
}) { | ||
incoming.single().apply { | ||
assertEquals(1, data?.id) | ||
assertEquals("Jet", data?.firstName) | ||
assertEquals("Brains", data?.lastName) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would like to see this work with different types on the same server.
public class ServerSentEvent<T>( | ||
public val data: T? = null, | ||
public val event: String? = null, | ||
public val id: String? = null, | ||
public val retry: Long? = null, | ||
public val comments: String? = null | ||
) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice if this was a data class, so instead of having:
incoming.collect { person ->
println(person.data)
}
We could have
incoming.collect { (person) ->
println(person)
}
@@ -12,16 +12,16 @@ import kotlinx.coroutines.flow.* | |||
/** | |||
* A Server-sent events session. | |||
*/ | |||
public interface SSESession : CoroutineScope { | |||
public interface SSESession<T> : CoroutineScope { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would keep SSESession without generic types and introduce a new type for the session with deserialization. It will be more explicit and simplify maintenance in the long run
@e5l @bjhham Thanks for the comments, please take a look at the new commits Right now, for the client-side:
public interface SSESession : CoroutineScope {
public val incoming: Flow<ServerSentEvent<String>>
}
public interface SSESessionWithDeserialization: SSESession {
public val deserializer: (TypeInfo) -> (String) -> Any
}
@Serializable
data class Customer(val id: Int, val firstName: String, val lastName: String)
@Serializable
data class Product(val name: String, val price: Int)
client.sse({
url("$TEST_SERVER/sse/json")
}, deserialize = { typeInfo: TypeInfo ->
{ jsonString: String ->
val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!)
Json.decodeFromString(serializer, jsonString) ?: Exception()
}
}) {
incoming.collect { event ->
if (customer) {
val customer = deserialize<Customer>(event.data)
assertEquals("Jet", customer?.firstName)
} else {
val product = deserialize<Product>(event.data)
assertEquals(100, product?.price)
}
}
} Example of val session = client.serverSentEventsSession("$TEST_SERVER/sse/hello")
val event: ServerSentEvent<String> = session.incoming.single()
assertEquals("0", event.id) For the server-side:
public interface SSESession : CoroutineScope {
public val call: ApplicationCall
public suspend fun send(event: ServerSentEvent<String>)
}
public interface SSESessionWithSerialization : SSESession {
public val serializer: (TypeInfo) -> (Any) -> String
}
public suspend inline fun <reified T : Any> SSESessionWithSerialization.sendSerialized(event: ServerSentEvent<T>) {
send(
ServerSentEvent(
event.data?.let {
serializer(typeInfo<T>()).invoke(it)
},
event.event,
event.id,
event.retry,
event.comments
)
)
}
sse("/hello") {
send(ServerSentEvent("world", event = "send", id = "100", retry = 1000, comments = "comment"))
} Example of class Person1(val age: Int)
class Person2(val number: Int)
sse(serialize = {
typeInfo ->
{ data ->
when (typeInfo.type) {
Person1::class -> {
"Age ${(data as Person1).age}"
}
Person2::class -> {
"Number ${(data as Person2).number}"
}
else -> {
data.toString()
}
}
}
}) {
sendSerialized(Person1(22))
sendSerialized(Person2(123456))
} Questions:
|
public class ServerSentEvent( | ||
public val data: String? = null, | ||
public data class ServerSentEvent<T>( | ||
public val data: T? = null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also split this class in 2 so String would be default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I split into ServerSentEven
t and ParameterizedServerSentEvent<T>
/** | ||
* Deserializer for transforming field `data` of `ServerSentEvent` into desired data object. | ||
*/ | ||
public val deserializer: (TypeInfo) -> (String) -> Any |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you tell me why it's not just fun deserialize(TypeInfo, String) -> Any
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added Deserializer
and Serializer
interface:
public interface Deserializer {
public fun deserialize(typeInfo: TypeInfo, data: String): Any
}
Do we want to make them functional because now it's used like:
client.sse(
{ url("$TEST_SERVER/sse/person") },
object : Deserializer {
override fun deserialize(typeInfo: TypeInfo, data: String): Any {
return Person1(data)
}
}) {
incoming.single().apply {
assertEquals("Name 0", deserialize<Person1>(data)?.name)
}
}
@@ -31,7 +32,7 @@ public data object SSECapability : HttpClientEngineCapability<Unit> | |||
* val client = HttpClient { | |||
* install(SSE) | |||
* } | |||
* client.sse { | |||
* client.sse<String> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would avoid having generics on the sse method
public suspend inline fun <reified T : Any> SSESessionWithSerialization.sendSerialized( | ||
data: T? = null, | ||
event: String? = null, | ||
id: String? = null, | ||
retry: Long? = null, | ||
comments: String? = null | ||
) { | ||
sendSerialized(ParameterizedServerSentEvent(data, event, id, retry, comments)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can just overload send
here to avoid the wordiness of sendSerialized
.
/** | ||
* Serializer interface for transforming data object into field `data` of `ServerSentEvent`. | ||
*/ | ||
public interface Serializer { | ||
|
||
/** | ||
* Transforms data object into field `data` of `ServerSentEvent`. | ||
*/ | ||
public fun serialize(typeInfo: TypeInfo, data: Any): String | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can just use the lambda expression (typeInfo, data) -> String
for the serializer field, since the argument provides context and this interface clutters up the API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@e5l suggested using an interface in case we would need to change the API or add something. But (typeInfo, data) -> String
seems like final thing
public data class ParameterizedServerSentEvent<T>( | ||
public val data: T? = null, | ||
public val event: String? = null, | ||
public val id: String? = null, | ||
public val retry: Long? = null, | ||
public val comments: String? = null | ||
) { | ||
@InternalAPI | ||
public fun toString(serializer: (T) -> String): String = | ||
eventToString(data?.let { serializer(it) }, event, id, retry, comments) | ||
} | ||
|
||
private fun eventToString(data: String?, event: String?, id: String?, retry: Long?, comments: String?): String { | ||
return buildString { | ||
appendField("data", data) | ||
appendField("event", event) | ||
appendField("id", id) | ||
appendField("retry", retry) | ||
appendField("", comments) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd go with something like:
sealed interface ServerSentEventMetadata<T> {
val data: T?
val event: String?
val id: String?
val retry: Long?
val comments: String?
}
data class ServerSentEvent(...): ServerSentEventMetadata<String>
data class ServerSentEventParsed(...): ServerSentEventMetadata<T>
client.sse( | ||
{ | ||
url("$TEST_SERVER/sse/person") | ||
parameter("times", count) | ||
}, | ||
deserialize = object : Deserializer { | ||
override fun deserialize(typeInfo: TypeInfo, data: String): Any { | ||
return Person(data) | ||
} | ||
} | ||
) { | ||
incoming.collectIndexed { i, event -> | ||
val person = deserialize<Person>(event) | ||
assertEquals("Name $i", person?.name) | ||
assertEquals("$i", event.id) | ||
size++ | ||
} | ||
} | ||
assertEquals(count, size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the removal of the default deserializer in the client config, I think it would simplify things to just go back to (String) -> E
as the argument for the session and have the incoming return the parsed events from incoming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean have incoming: Flow<ServerSentEventParsed<*>>
?
@Test | ||
fun testDifferentSerializers() = testApplication { | ||
install(SSE) | ||
routing { | ||
sse(object : Serializer { | ||
override fun serialize(typeInfo: TypeInfo, data: Any): String { | ||
return when (typeInfo.type) { | ||
Person1::class -> { | ||
"Age ${(data as Person1).age}" | ||
} | ||
|
||
Person2::class -> { | ||
"Number ${(data as Person2).number}" | ||
} | ||
|
||
else -> { | ||
data.toString() | ||
} | ||
} | ||
} | ||
}) { | ||
sendSerialized(Person1(22)) | ||
sendSerialized(Person2(123456)) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be interested in seeing an example with kotlinx-serialization-json, since this is probably the most common use-case for this feature. We ought to shape the API so that this is easy to implement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is used kotlinx-serialization-json
Line 291 in 9ce8ec5
return Json.encodeToString(serializer, data) |
9ce8ec5
to
19f01fe
Compare
/** | ||
* An incoming server-sent events flow. | ||
*/ | ||
public val incoming: Flow<ServerSentEventParsed<String>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's think about naming. ServerSentEventParsed
looks inconsistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about TypedServerSentEvent
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
/** | ||
* Deserializer for transforming field `data` of `ServerSentEvent` into desired data object. | ||
*/ | ||
public val deserializer: (TypeInfo, String) -> Any |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we avoid functional fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@e5l It was an interface, but after #4363 (comment) I reverted to the functional field
@e5l @bjhham Please, take a look. Right now, there are two important things to discuss: - Do we want to introduce functional fields or interfaces for
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👍
It would be nice to have some bridge code for the JSON scenario where you don't have to worry about the deserialization args, though we don't really have a precedent for convenience libraries for specific plugins... maybe it could be introduced to the sample code in the registry, idk, food for thought.
Also, please rebase on 3.1 EAP
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please provide more descriptive API docs with usage examples, edge cases, and references
@@ -6,11 +6,23 @@ package io.ktor.client.plugins.sse | |||
|
|||
import io.ktor.client.call.* | |||
import io.ktor.sse.* | |||
import io.ktor.util.reflect.* | |||
import kotlinx.coroutines.* | |||
import kotlinx.coroutines.flow.* | |||
|
|||
/** | |||
* A Server-sent events session. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please consider adding a reference on what SSE is on the wiki page
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it also would be great to highlight where the receiver type is used in the example
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated KDoc to:
A session for handling Server-Sent Events (SSE) from a server.
Example of usage:
client.sse("http://localhost:8080/sse") { // `this` is `ClientSSESession`
incoming.collect { event ->
println("Id: ${event.id}")
println("Event: ${event.event}")
println("Data: ${event.data}")
}
}
To learn more, see the SSE and the SSE specification.
@vnikolova, could you help us check KDocs? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The KDocs look clear - I made some minor suggestions.
I'm not sure if it's a common practice, but it might be useful to add comments in the code examples for added clarity.
ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/sse/ClientSSESession.kt
Outdated
Show resolved
Hide resolved
ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/sse/ClientSSESession.kt
Outdated
Show resolved
Hide resolved
ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/sse/ClientSSESession.kt
Outdated
Show resolved
Hide resolved
ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/sse/ClientSSESession.kt
Outdated
Show resolved
Hide resolved
ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/sse/ClientSSESession.kt
Outdated
Show resolved
Hide resolved
ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/sse/SSE.kt
Outdated
Show resolved
Hide resolved
ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/Routing.kt
Outdated
Show resolved
Hide resolved
ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/Routing.kt
Outdated
Show resolved
Hide resolved
ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/Routing.kt
Outdated
Show resolved
Hide resolved
ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/SSESession.kt
Outdated
Show resolved
Hide resolved
5e30c20
to
2ee64f0
Compare
…/sse/ClientSSESession.kt Co-authored-by: Vik Nikolova <[email protected]>
…/sse/ClientSSESession.kt Co-authored-by: Vik Nikolova <[email protected]>
…/sse/ClientSSESession.kt Co-authored-by: Vik Nikolova <[email protected]>
…/sse/SSE.kt Co-authored-by: Vik Nikolova <[email protected]>
…/sse/SSE.kt Co-authored-by: Vik Nikolova <[email protected]>
…ktor/server/sse/Routing.kt Co-authored-by: Vik Nikolova <[email protected]>
…ktor/server/sse/Routing.kt Co-authored-by: Vik Nikolova <[email protected]>
…ktor/server/sse/SSESession.kt Co-authored-by: Vik Nikolova <[email protected]>
0502a51
to
dbef334
Compare
Subsystem
Client SSE, Server SSE
Motivation
KTOR-7435