Skip to content

Commit

Permalink
0.1.0 release
Browse files Browse the repository at this point in the history
  • Loading branch information
henryjcee committed Feb 25, 2024
1 parent 7c6c864 commit 313a8ce
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 40 deletions.
24 changes: 16 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

I've spent long enough faffing around with reactive database drivers etc. and inspired
by [this issue](https://youtrack.jetbrains.com/issue/KTOR-6734/Jetty-engine-Upgrade-Jetty-dependencies-to-the-latest-version-12)
I have decided to build a POC that demonstrates virtual thread support in Ktor using Jetty 12. Most of this is code
I have decided to build a POC that demonstrates **virtual thread support in Ktor using Jetty 12**. Most of this is code
adapted from the existing Ktor Jetty engine but I've completely dropped servlet support which should be the fastest way
to run Jetty.
to run Jetty. **Websockets and HTTP3 seem to working** but I haven't tested either in anger.

It should be pretty easy to use, add the dep and start a server like:

Expand All @@ -23,11 +23,11 @@ blocking OS threads and using libs that make use of `ThreadLocal`.

- Built on Ktor `3.0.0-beta1`. This makes it incompatible with `2.x.x` and also doesn't support the new kotlinx IO libs
that are going to used in Kotlin 4.
- This seems to work for most HTTP 1.1 requests that I've tried but the HTTPX/websocket support almost certainly doesn't
work.
- This seems to work for most HTTP 1.1 requests that I've tried ~but the HTTPX/websocket support almost certainly doesn't
work~ and HTTP3 and websockets seem to be working.
- The vthread dispatcher works but I have no idea if it's a good idea.
- Java 21+ only
- `initializeServer()` is a mess
- Java 21+ only (of course).
- I suspect there are problems in how concurrency is structured in the websockets logic.

## Changelog

Expand All @@ -41,12 +41,20 @@ blocking OS threads and using libs that make use of `ThreadLocal`.

- No content response support
- Implemented byte-array response support
-
-

### 0.0.4

- Websockets support (needs work)
- HTTP3 support
- HTTP3 support

### 0.1.0

- Switched dispatcher to unbounded task queue instead of a rendezvous. Not sure why I did that to start with as it (
obviously) leads to deadlocks
- Added `loomAsync {}` to go with `loomLaunch {}`
- TLS is working (see Ktor docs for config)
- Websockets support has been worked on and I've got it running over http and https

## Contributing

Expand Down
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ plugins {
}

group = "com.henrycourse"
version = "0.0.3"
version = "0.1.0"

publishing {
publications {
Expand Down
20 changes: 17 additions & 3 deletions src/main/kotlin/com/henrycourse/coroutines/LoomDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ import io.ktor.utils.io.WriterScope
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Job
import kotlinx.coroutines.Runnable
import kotlinx.coroutines.async
import kotlinx.coroutines.launch
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

// Not the prettiest thing in the in world but it works
// Would be nice to get these working with launch {} / async {} but I've already spent too long on this
fun CoroutineScope.loomLaunch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
Expand All @@ -29,11 +31,22 @@ fun CoroutineScope.loomLaunch(
}
}

fun <T> CoroutineScope.loomAsync(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val dispatcher = LoomDispatcher()
return async(context + dispatcher, start, block).apply {
dispatcher.close()
}
}

internal val closeJob: Runnable = Runnable {}

class LoomDispatcher : CoroutineDispatcher() {

private val taskQueue = ArrayBlockingQueue<Runnable>(1)
private val taskQueue = LinkedBlockingQueue<Runnable>()

init {
Thread.ofVirtual().start {
Expand All @@ -57,6 +70,7 @@ class LoomDispatcher : CoroutineDispatcher() {
}
}

// Replicas of the stuff from ktor-server-jetty
class LoomChannelJob(
private val delegate: Job,
override val channel: ByteChannel
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.henrycourse.jetty


import io.ktor.server.application.Application
import io.ktor.server.engine.BaseApplicationCall
import io.ktor.server.engine.BaseApplicationRequest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import io.ktor.utils.io.close
import io.ktor.utils.io.pool.ByteBufferPool
import io.ktor.utils.io.pool.DirectByteBufferPool
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import org.eclipse.jetty.server.Request
import org.eclipse.jetty.server.Response
import org.eclipse.jetty.util.Callback
Expand Down Expand Up @@ -55,22 +57,25 @@ class Jetty12ApplicationResponse(

val connection = request.connectionMetaData.connection
val endpoint = connection.endPoint
endpoint.idleTimeout = 60 * 1000
endpoint.idleTimeout = 6000 * 1000

val websocketConnection = Jetty12WebsocketConnection(endpoint, coroutineContext)
response.write(true, emptyBuffer, Callback.from { endpoint.upgrade(websocketConnection) })

// TODO: Check what needs to happen with contexts here
val upgradeJob = upgrade.upgrade(
websocketConnection.inputChannel,
websocketConnection.outputChannel,
LoomDispatcher(),
LoomDispatcher()
coroutineContext,
)

upgradeJob.invokeOnCompletion {
websocketConnection.inputChannel.cancel()
websocketConnection.inputChannel.close()
websocketConnection.outputChannel.close()
}

upgradeJob.join()
}

private val responseJob: Lazy<ReaderJob> = lazy {
Expand Down
58 changes: 34 additions & 24 deletions src/main/kotlin/com/henrycourse/jetty/Jetty12WebsocketConnection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import io.ktor.util.cio.ChannelWriteException
import io.ktor.utils.io.ByteChannel
import io.ktor.utils.io.close
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.suspendCancellableCoroutine
import org.eclipse.jetty.io.AbstractConnection
import org.eclipse.jetty.io.EndPoint
Expand All @@ -16,7 +16,7 @@ import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException


// TODO: Needs sensible error handling
class Jetty12WebsocketConnection(
private val endpoint: EndPoint,
override val coroutineContext: CoroutineContext
Expand All @@ -29,24 +29,43 @@ class Jetty12WebsocketConnection(
val inputChannel = ByteChannel(true)
val outputChannel = ByteChannel(false)

private val channel = Channel<Boolean>(Channel.RENDEZVOUS)

init {

fillInterested()
// Input job
loomLaunch {

// TODO: Handle errors
while (true) {

fillInterested()
channel.receive()

inputBuffer.clear().flip()

val read = endpoint.fill(inputBuffer)

if (read > 0) {
inputChannel.writeFully(inputBuffer)
} else if (read == -1) {
endpoint.close()
}
}
}

// Output job
loomLaunch {

// TODO: Handle errors
while (true) {

if (outputChannel.isClosedForRead) {
return@loomLaunch
}
val outputBytes = outputChannel.readAvailable(outputBuffer.rewind())

if (outputBytes < 0 || !endpoint.isOpen) {
outputChannel.close()
bufferPool.recycle(outputBuffer)
return@loomLaunch
} else {
if (outputBytes > -1) {
suspendCancellableCoroutine<Unit> {
endpoint.write(
object : Callback {
Expand All @@ -60,31 +79,22 @@ class Jetty12WebsocketConnection(
outputBuffer.flip()
)
}
} else {
outputChannel.close()
bufferPool.recycle(outputBuffer)
return@loomLaunch
}
}
}
}

// TODO: Handle errors
override fun onFillInterestedFailed(cause: Throwable) {
endpoint.close()
inputChannel.close()
bufferPool.recycle(inputBuffer)
throw cause
}

override fun onFillable() {

if (endpoint.fill(inputBuffer.rewind().flip()) > -1) {
runBlocking {
inputChannel.writeFully(inputBuffer)
}
try {
fillInterested()
} catch (e: Throwable) {
onFillInterestedFailed(e)
}
} else {
endpoint.close()
}
channel.trySend(true)
}

override fun onUpgradeTo(buffer: ByteBuffer?) = TODO()
Expand Down

0 comments on commit 313a8ce

Please sign in to comment.