Skip to content

Commit

Permalink
KTOR-3658 Replace custom WeakTimeoutQueue with coroutines.withTimeout (
Browse files Browse the repository at this point in the history
…#2794)

Co-authored-by: hfhbd <[email protected]>
  • Loading branch information
2 people authored and osipxd committed Nov 14, 2024
1 parent 7949269 commit b0fd16e
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 475 deletions.
11 changes: 1 addition & 10 deletions ktor-server/ktor-server-cio/api/ktor-server-cio.api
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public final class io/ktor/server/cio/backend/ServerIncomingConnection {
}

public final class io/ktor/server/cio/backend/ServerPipelineKt {
public static final fun startServerConnectionPipeline (Lkotlinx/coroutines/CoroutineScope;Lio/ktor/server/cio/backend/ServerIncomingConnection;Lio/ktor/server/cio/internal/WeakTimeoutQueue;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/Job;
public static final fun startServerConnectionPipeline-exY8QGI (Lkotlinx/coroutines/CoroutineScope;Lio/ktor/server/cio/backend/ServerIncomingConnection;JLkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/Job;
}

public final class io/ktor/server/cio/backend/ServerRequestScope : kotlinx/coroutines/CoroutineScope {
Expand All @@ -85,12 +85,3 @@ public final class io/ktor/server/cio/backend/ServerRequestScope : kotlinx/corou
public final fun withContext (Lkotlin/coroutines/CoroutineContext;)Lio/ktor/server/cio/backend/ServerRequestScope;
}

public final class io/ktor/server/cio/internal/WeakTimeoutQueue {
public fun <init> (JLkotlin/jvm/functions/Function0;)V
public synthetic fun <init> (JLkotlin/jvm/functions/Function0;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun cancel ()V
public final fun getTimeoutMillis ()J
public final fun process ()V
public final fun withTimeout (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import java.util.concurrent.atomic.*
import kotlin.coroutines.*
import kotlin.test.*
import kotlin.test.Test
import kotlin.time.*
import kotlin.time.Duration.Companion.milliseconds

@OptIn(InternalAPI::class)
class ServerPipelineTest : CoroutineScope {
Expand Down Expand Up @@ -51,8 +53,7 @@ class ServerPipelineTest : CoroutineScope {
@Test
fun testSmoke(): Unit = runBlocking {
val connection = ServerIncomingConnection(ByteChannel(), ByteChannel(), null, null)
val queue = WeakTimeoutQueue(10L) { 1L }
val job = startServerConnectionPipeline(connection, queue) {
val job = startServerConnectionPipeline(connection, timeout = Duration.INFINITE) {
error("Shouldn't reach here")
}

Expand All @@ -67,8 +68,7 @@ class ServerPipelineTest : CoroutineScope {
val requestsReceived = ArrayList<String>()

val connection = ServerIncomingConnection(input, output, null, null)
val queue = WeakTimeoutQueue(10L) { 1L }
startServerConnectionPipeline(connection, queue) { request ->
startServerConnectionPipeline(connection, timeout = Duration.INFINITE) { request ->
requestsReceived += request.uri.toString()
assertEquals("/", request.uri.toString())
assertEquals("GET", request.method.value)
Expand Down Expand Up @@ -102,8 +102,7 @@ class ServerPipelineTest : CoroutineScope {
val requestsReceived = ArrayList<String>()

val connection = ServerIncomingConnection(input, output, null, null)
val queue = WeakTimeoutQueue(10L) { 1L }
startServerConnectionPipeline(connection, queue) { request ->
startServerConnectionPipeline(connection, timeout = Duration.INFINITE) { request ->
requestsReceived += request.uri.toString()
assertEquals("/", request.uri.toString())
assertEquals("GET", request.method.value)
Expand Down Expand Up @@ -139,8 +138,7 @@ class ServerPipelineTest : CoroutineScope {
val latch = Job()

val connection = ServerIncomingConnection(input, output, null, null)
val queue = WeakTimeoutQueue(1L)
startServerConnectionPipeline(connection, queue) { request ->
startServerConnectionPipeline(connection, timeout = 1.milliseconds) { request ->
requestsReceived += request.uri.toString()
assertEquals("/", request.uri.toString())
assertEquals("GET", request.method.value)
Expand Down Expand Up @@ -168,7 +166,6 @@ class ServerPipelineTest : CoroutineScope {
assertEquals("/", requestsReceived.single())

delay(100)
queue.process() // shouldn't be cancelled here because it is upgraded and running
latch.complete()

input.close()
Expand All @@ -180,24 +177,12 @@ class ServerPipelineTest : CoroutineScope {
val input = ByteChannel()
val output = ByteChannel()

val clock = AtomicLong(1L)

val connection = ServerIncomingConnection(input, output, null, null)
val queue = WeakTimeoutQueue(10L) { clock.get() }
supervisorScope {
val job = startServerConnectionPipeline(connection, queue) {
startServerConnectionPipeline(connection, Duration.ZERO) {
error("Shouldn't reach here")
}

// the timeout queue is only working when there is at least small activity
launch(CoroutineName("poller")) {
do {
queue.process()
delay(50)
clock.addAndGet(11L)
} while (job.isActive)
}

// it's important to close the joint channel as it happens in real networks
// this is really only a test specific thing
launch(CoroutineName("IO helper")) {
Expand All @@ -215,13 +200,11 @@ class ServerPipelineTest : CoroutineScope {
val input = ByteChannel()
val output = ByteChannel()

val clock = AtomicLong(1L)
val requestHandled = Job()

val connection = ServerIncomingConnection(input, output, null, null)
val queue = WeakTimeoutQueue(10L) { clock.get() }
supervisorScope {
val job = startServerConnectionPipeline(connection, queue) { request ->
startServerConnectionPipeline(connection, timeout = 100.milliseconds) { request ->
requestHandled.complete()
request.release()
input.cancel()
Expand All @@ -235,15 +218,6 @@ class ServerPipelineTest : CoroutineScope {
// after processing the request, the idle timeout machinery should cancel all the stuff
requestHandled.join()

// the timeout queue is only working when there is at least small activity
launch(CoroutineName("poller")) {
do {
queue.process()
delay(50)
clock.addAndGet(11L)
} while (job.isActive)
}

// it's important to close the joint channel as it happens in real networks
// this is really only a test specific thing
launch(CoroutineName("IO helper")) {
Expand All @@ -259,11 +233,10 @@ class ServerPipelineTest : CoroutineScope {
@Test
fun testParentJobAndTimerCancellation() {
val l = CountDownLatch(1)
val queue = WeakTimeoutQueue(100000L)

val root = launch(coroutineContext) {
val connection = ServerIncomingConnection(ByteChannel(), ByteChannel(), null, null)
startServerConnectionPipeline(connection, queue) {
startServerConnectionPipeline(connection, timeout = Duration.INFINITE) {
error("Shouldn't reach here")
}
l.countDown()
Expand All @@ -274,7 +247,6 @@ class ServerPipelineTest : CoroutineScope {

runBlocking {
root.cancel()
queue.cancel()

root.join()
}
Expand All @@ -283,11 +255,10 @@ class ServerPipelineTest : CoroutineScope {
@Test
fun testParentJobAndTimeoutCancellation(): Unit = runBlocking(coroutineContext) {
val l = Job()
val queue = WeakTimeoutQueue(10L)

val root = launch {
val connection = ServerIncomingConnection(ByteChannel(), ByteChannel(), null, null)
startServerConnectionPipeline(connection, queue) {
startServerConnectionPipeline(connection, timeout = 10.milliseconds) {
error("Shouldn't reach here")
}
l.complete()
Expand All @@ -296,10 +267,6 @@ class ServerPipelineTest : CoroutineScope {
l.join()
delay(100) // we need this delay because launching a coroutine takes time

launch {
queue.cancel()
}

delay(1)
root.cancelAndJoin()
}
Expand Down
Loading

0 comments on commit b0fd16e

Please sign in to comment.