Skip to content

Commit

Permalink
Change ServerRunner signature to Resource
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan-klass committed Jan 22, 2025
1 parent d4ec1cf commit 2044178
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 137 deletions.
48 changes: 21 additions & 27 deletions perf-tests/src/main/scala/sttp/tapir/perf/apis/ServerRunner.scala
Original file line number Diff line number Diff line change
@@ -1,48 +1,42 @@
package sttp.tapir.perf.apis

import cats.effect.{ExitCode, IO, IOApp}
import cats.effect.{IO, Resource, ResourceApp}

import scala.reflect.runtime.universe

trait ServerRunner {
def start: IO[ServerRunner.KillSwitch]
def runServer: Resource[IO, Unit]
}

/** Can be used as a Main object to run a single server using its short name. Running perfTests/runMain
* [[sttp.tapir.perf.apis.ServerRunner]] will load special javaOptions configured in build.sbt, enabling recording JFR metrics. This is
* useful when you want to guarantee that the server runs in a different JVM than test runner, so that memory and CPU metrics are recorded
* only in the scope of the server JVM.
*/
object ServerRunner extends IOApp {
type KillSwitch = IO[Unit]
val NoopKillSwitch = IO.pure(IO.unit)
object ServerRunner extends ResourceApp.Forever {

private val runtimeMirror = universe.runtimeMirror(getClass.getClassLoader)
private val requireArg: Resource[IO, Unit] = Resource.raiseError(
new IllegalArgumentException(s"Unspecified server name. Use one of: ${TypeScanner.allServers}"): Throwable
)
private def notInstantiated(name: ServerName)(e: Throwable): IO[ServerRunner] = IO.raiseError(
new IllegalArgumentException(
s"ERROR! Could not find object ${name.fullName} or it doesn't extend ServerRunner", e
)
)

def run(args: List[String]): IO[ExitCode] = {
val shortServerName = args.headOption.getOrElse {
throw new IllegalArgumentException(s"Unspecified server name. Use one of: ${TypeScanner.allServers}")
}
for {
killSwitch <- startServerByTypeName(ServerName.fromShort(shortServerName))
_ <- IO.never.guarantee(killSwitch)
} yield ExitCode.Success
}
def run(args: List[String]): Resource[IO, Unit] =
args.headOption.map(ServerName.fromShort).map(startServerByTypeName).getOrElse(requireArg)

def startServerByTypeName(serverName: ServerName): IO[ServerRunner.KillSwitch] = {
def startServerByTypeName(serverName: ServerName): Resource[IO, Unit] =
serverName match {
case ExternalServerName => NoopKillSwitch
case _ =>
try {
case ExternalServerName => Resource.unit
case _ => Resource.eval(
IO({
val moduleSymbol = runtimeMirror.staticModule(serverName.fullName)
val moduleMirror = runtimeMirror.reflectModule(moduleSymbol)
val instance: ServerRunner = moduleMirror.instance.asInstanceOf[ServerRunner]
instance.start
} catch {
case e: Throwable =>
IO.raiseError(
new IllegalArgumentException(s"ERROR! Could not find object ${serverName.fullName} or it doesn't extend ServerRunner", e)
)
}
moduleMirror.instance.asInstanceOf[ServerRunner]
}).handleErrorWith(notInstantiated(serverName))
).flatMap(_.runServer)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import io.github.classgraph.ClassGraph

import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}

import sttp.tapir.perf.Common._

Expand Down
19 changes: 8 additions & 11 deletions perf-tests/src/main/scala/sttp/tapir/perf/http4s/Http4s.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,24 +106,21 @@ object Tapir extends Endpoints {
object server {
val maxConnections = 65536
val connectorPoolSize: Int = Math.max(2, Runtime.getRuntime.availableProcessors() / 4)
def runServer(
router: WebSocketBuilder2[IO] => HttpRoutes[IO]
): IO[ServerRunner.KillSwitch] =
def runServer(router: WebSocketBuilder2[IO] => HttpRoutes[IO]): Resource[IO, Unit] =
BlazeServerBuilder[IO]
.bindHttp(Port, "localhost")
.withHttpWebSocketApp(wsb => router(wsb).orNotFound)
.withMaxConnections(maxConnections)
.withConnectorPoolSize(connectorPoolSize)
.resource
.useForever
.start
.map(_.cancel *> IO.println("Http4s server closed."))
.map(_ => ())
.onFinalize(IO.println("Http4s server closed."))
}

object TapirServer extends ServerRunner { override def start = server.runServer(Tapir.router(1)) }
object TapirMultiServer extends ServerRunner { override def start = server.runServer(Tapir.router(128)) }
object TapirServer extends ServerRunner { override def runServer = server.runServer(Tapir.router(1)) }
object TapirMultiServer extends ServerRunner { override def runServer = server.runServer(Tapir.router(128)) }
object TapirInterceptorMultiServer extends ServerRunner {
override def start = server.runServer(Tapir.router(128, withServerLog = true))
override def runServer = server.runServer(Tapir.router(128, withServerLog = true))
}
object VanillaServer extends ServerRunner { override def start = server.runServer(Vanilla.router(1)) }
object VanillaMultiServer extends ServerRunner { override def start = server.runServer(Vanilla.router(128)) }
object VanillaServer extends ServerRunner { override def runServer = server.runServer(Vanilla.router(1)) }
object VanillaMultiServer extends ServerRunner { override def runServer = server.runServer(Vanilla.router(128)) }
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import sttp.tapir.perf.apis._
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.netty.cats.NettyCatsServer
import sttp.tapir.server.netty.cats.NettyCatsServerOptions
import sttp.ws.WebSocketFrame
import sttp.capabilities.fs2.Fs2Streams

import scala.concurrent.duration._
Expand All @@ -33,27 +32,26 @@ object NettyCats {
Tapir.wsResponseStream.evalMap(_ => IO.realTime.map(_.toMillis)).concurrently(in.as(()))
}
)
def runServer(endpoints: List[ServerEndpoint[Any, IO]], withServerLog: Boolean = false): IO[ServerRunner.KillSwitch] = {
def runServer(endpoints: List[ServerEndpoint[Any, IO]], withServerLog: Boolean = false): Resource[IO, Unit] = {
val declaredPort = Port
val declaredHost = "0.0.0.0"
(for {
for {
dispatcher <- Dispatcher.parallel[IO]
serverOptions = buildOptions(NettyCatsServerOptions.customiseInterceptors(dispatcher), withServerLog)
server <- NettyCatsServer.io()
_ <-
Resource.make(
server
.port(declaredPort)
.host(declaredHost)
.addEndpoints(wsServerEndpoint :: endpoints)
.start()
)(binding => binding.stop())
} yield ()).allocated.map(_._2)
server <- NettyCatsServer.io().map(_.options(serverOptions))
_ <- Resource.make(
server
.port(declaredPort)
.host(declaredHost)
.addEndpoints(wsServerEndpoint :: endpoints)
.start()
)(_.stop())
} yield ()
}
}

object TapirServer extends ServerRunner { override def start = NettyCats.runServer(Tapir.genEndpointsIO(1)) }
object TapirMultiServer extends ServerRunner { override def start = NettyCats.runServer(Tapir.genEndpointsIO(128)) }
object TapirServer extends ServerRunner { override def runServer = NettyCats.runServer(Tapir.genEndpointsIO(1)) }
object TapirMultiServer extends ServerRunner { override def runServer = NettyCats.runServer(Tapir.genEndpointsIO(128)) }
object TapirInterceptorMultiServer extends ServerRunner {
override def start = NettyCats.runServer(Tapir.genEndpointsIO(128), withServerLog = true)
override def runServer = NettyCats.runServer(Tapir.genEndpointsIO(128), withServerLog = true)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package sttp.tapir.perf.netty.future

import cats.effect.IO
import cats.effect.{IO, Resource}
import sttp.tapir.perf.apis._
import sttp.tapir.perf.Common._
import sttp.tapir.server.netty.{NettyFutureServer, NettyFutureServerBinding, NettyFutureServerOptions}
Expand All @@ -14,7 +14,7 @@ object Tapir extends Endpoints

object NettyFuture {

def runServer(endpoints: List[ServerEndpoint[Any, Future]], withServerLog: Boolean = false): IO[ServerRunner.KillSwitch] = {
def runServer(endpoints: List[ServerEndpoint[Any, Future]], withServerLog: Boolean = false): Resource[IO, Unit] = {
val declaredPort = Port
val declaredHost = "0.0.0.0"
val serverOptions = buildOptions(NettyFutureServerOptions.customiseInterceptors, withServerLog)
Expand All @@ -29,13 +29,12 @@ object NettyFuture {
.start()
)
)

serverBinding.map(b => IO.fromFuture(IO(b.stop())))
Resource.make(serverBinding)(b => IO.fromFuture(IO(b.stop()))).map(_ => ())
}
}

object TapirServer extends ServerRunner { override def start = NettyFuture.runServer(Tapir.genEndpointsFuture(1)) }
object TapirMultiServer extends ServerRunner { override def start = NettyFuture.runServer(Tapir.genEndpointsFuture(128)) }
object TapirServer extends ServerRunner { override def runServer = NettyFuture.runServer(Tapir.genEndpointsFuture(1)) }
object TapirMultiServer extends ServerRunner { override def runServer = NettyFuture.runServer(Tapir.genEndpointsFuture(128)) }
object TapirInterceptorMultiServer extends ServerRunner {
override def start = NettyFuture.runServer(Tapir.genEndpointsFuture(128), withServerLog = true)
override def runServer = NettyFuture.runServer(Tapir.genEndpointsFuture(128), withServerLog = true)
}
32 changes: 17 additions & 15 deletions perf-tests/src/main/scala/sttp/tapir/perf/nima/Nima.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package sttp.tapir.perf.nima

import cats.effect.IO
import cats.effect.{IO, Resource}
import io.helidon.webserver.WebServer
import sttp.shared.Identity
import sttp.tapir.perf.apis._
Expand All @@ -14,27 +14,29 @@ object Tapir extends Endpoints {

object Nima {

def runServer(endpoints: List[ServerEndpoint[Any, Identity]], withServerLog: Boolean = false): IO[ServerRunner.KillSwitch] = {
def runServer(endpoints: List[ServerEndpoint[Any, Identity]], withServerLog: Boolean = false): Resource[IO, Unit] = {
val declaredPort = Port
val serverOptions = buildOptions(NimaServerOptions.customiseInterceptors, withServerLog)
// Starting Nima server

val handler = NimaServerInterpreter(serverOptions).toHandler(endpoints)
val server = WebServer
.builder()
.routing { builder =>
builder.any(handler)
()
}
.port(declaredPort)
.build()
.start()
IO(IO { val _ = server.stop() })
val startServer = IO {
WebServer
.builder()
.routing { builder =>
builder.any(handler)
()
}
.port(declaredPort)
.build()
.start()
}
Resource.make(startServer)(server => IO(server.stop()).void).map(_ => ())
}
}

object TapirServer extends ServerRunner { override def start = Nima.runServer(Tapir.genEndpointsNId(1)) }
object TapirMultiServer extends ServerRunner { override def start = Nima.runServer(Tapir.genEndpointsNId(128)) }
object TapirServer extends ServerRunner { override def runServer = Nima.runServer(Tapir.genEndpointsNId(1)) }
object TapirMultiServer extends ServerRunner { override def runServer = Nima.runServer(Tapir.genEndpointsNId(128)) }
object TapirInterceptorMultiServer extends ServerRunner {
override def start = Nima.runServer(Tapir.genEndpointsNId(128), withServerLog = true)
override def runServer = Nima.runServer(Tapir.genEndpointsNId(128), withServerLog = true)
}
42 changes: 20 additions & 22 deletions perf-tests/src/main/scala/sttp/tapir/perf/pekko/PekkoHttp.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package sttp.tapir.perf.pekko

import cats.effect.IO
import cats.effect.{IO, Resource}
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.model.HttpEntity
Expand All @@ -14,7 +14,7 @@ import sttp.tapir.perf.apis._
import sttp.tapir.server.pekkohttp.{PekkoHttpServerInterpreter, PekkoHttpServerOptions}

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
import scala.concurrent.{ExecutionContext, Future}

object PekkoCommon {
// Define a source that emits the current timestamp every 100 milliseconds
Expand Down Expand Up @@ -103,27 +103,25 @@ object Tapir extends Endpoints {
}

object PekkoHttp {
def runServer(router: ActorSystem => Route): IO[ServerRunner.KillSwitch] = {
// We need to create a new actor system each time server is run
implicit val actorSystem: ActorSystem = ActorSystem("tapir-pekko-http")
implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher
IO.fromFuture(
IO(
Http()
.newServerAt("127.0.0.1", Port)
.bind(router(actorSystem))
.map { binding =>
IO.fromFuture(IO(binding.unbind().flatMap(_ => actorSystem.terminate()))).void
}
)
)
}
private val actorSystem = Resource.make(IO(ActorSystem("tapir-pekko-http")))(
aSystem => IO.fromFuture(IO(aSystem.terminate())).void
)

private def http(route: Route)(implicit aSystem: ActorSystem) = Resource.make(
IO.fromFuture(IO(Http().newServerAt("127.0.0.1", Port).bind(route)))
)(
binding => IO.fromFuture(IO(binding.unbind())).void
)

// We need to create a new actor system each time server is run
def runServer(router: ActorSystem => Route): Resource[IO, Unit] =
actorSystem.flatMap { implicit aSystem: ActorSystem => http(router(aSystem)) }.map(_ => ())
}

object TapirServer extends ServerRunner { override def start = PekkoHttp.runServer(Tapir.router(1)) }
object TapirMultiServer extends ServerRunner { override def start = PekkoHttp.runServer(Tapir.router(128)) }
object TapirServer extends ServerRunner { override def runServer = PekkoHttp.runServer(Tapir.router(1)) }
object TapirMultiServer extends ServerRunner { override def runServer = PekkoHttp.runServer(Tapir.router(128)) }
object TapirInterceptorMultiServer extends ServerRunner {
override def start = PekkoHttp.runServer(Tapir.router(128, withServerLog = true))
override def runServer = PekkoHttp.runServer(Tapir.router(128, withServerLog = true))
}
object VanillaServer extends ServerRunner { override def start = PekkoHttp.runServer(Vanilla.router(1)) }
object VanillaMultiServer extends ServerRunner { override def start = PekkoHttp.runServer(Vanilla.router(128)) }
object VanillaServer extends ServerRunner { override def runServer = PekkoHttp.runServer(Vanilla.router(1)) }
object VanillaMultiServer extends ServerRunner { override def runServer = PekkoHttp.runServer(Vanilla.router(128)) }
52 changes: 30 additions & 22 deletions perf-tests/src/main/scala/sttp/tapir/perf/play/Play.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package sttp.tapir.perf.play

import cats.effect.IO
import cats.effect.{IO, Resource}
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.util.ByteString
import play.api.Mode
Expand Down Expand Up @@ -87,28 +87,36 @@ object Tapir extends Endpoints {

object Play {

def runServer(routes: ActorSystem => Routes): IO[ServerRunner.KillSwitch] = {
implicit lazy val perfActorSystem: ActorSystem = ActorSystem(s"tapir-play")
val playRouter =
Router.from(
List(routes(perfActorSystem)).reduce((a: Routes, b: Routes) => {
val handler: PartialFunction[RequestHeader, Handler] = { case request =>
a.applyOrElse(request, b)
}
handler
})
)
val components = new DefaultPekkoHttpServerComponents {
private val actorSystem = Resource.make(
IO(ActorSystem("tapir-play"))
)(
aSystem => IO.fromFuture(IO(aSystem.terminate())).void
)

private def httpServer(routes: Routes, actSys: ActorSystem) = Resource.make(IO {
val server = new DefaultPekkoHttpServerComponents {
override lazy val serverConfig: ServerConfig = ServerConfig(port = Some(Port), address = "127.0.0.1", mode = Mode.Test)
override lazy val actorSystem: ActorSystem = perfActorSystem
override def router: Router = playRouter
override lazy val actorSystem: ActorSystem = actSys
override def router: Router = Router.from(routes)
}
IO(components.server).map(server => IO(server.stop()))
}
server.server
})(server => IO(server.stop()))

def runServer(routes: ActorSystem => Routes): Resource[IO, Unit] = actorSystem.flatMap {
aSystem => httpServer(
List(routes(aSystem)).reduce((a: Routes, b: Routes) => {
val handler: PartialFunction[RequestHeader, Handler] = { case request =>
a.applyOrElse(request, b)
}
handler
}),
aSystem
)
}.map(_ => ())
}

object TapirServer extends ServerRunner { override def start = Play.runServer(Tapir.router(1)) }
object TapirMultiServer extends ServerRunner { override def start = Play.runServer(Tapir.router(128)) }
object TapirInterceptorMultiServer extends ServerRunner { override def start = Play.runServer(Tapir.router(128, withServerLog = true)) }
object VanillaServer extends ServerRunner { override def start = Play.runServer(Vanilla.router(1)) }
object VanillaMultiServer extends ServerRunner { override def start = Play.runServer(Vanilla.router(128)) }
object TapirServer extends ServerRunner { override def runServer = Play.runServer(Tapir.router(1)) }
object TapirMultiServer extends ServerRunner { override def runServer = Play.runServer(Tapir.router(128)) }
object TapirInterceptorMultiServer extends ServerRunner { override def runServer = Play.runServer(Tapir.router(128, withServerLog = true)) }
object VanillaServer extends ServerRunner { override def runServer = Play.runServer(Vanilla.router(1)) }
object VanillaMultiServer extends ServerRunner { override def runServer = Play.runServer(Vanilla.router(128)) }
Loading

0 comments on commit 2044178

Please sign in to comment.