Skip to content

Commit

Permalink
Merge pull request #437 from armanbilge/topic/optimize-handleRequest
Browse files Browse the repository at this point in the history
Optimize `IOLambda#handleRequest` on JVM
  • Loading branch information
armanbilge authored Dec 18, 2023
2 parents 13d45c6 + 8ca2706 commit fa8c79a
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 32 deletions.
3 changes: 1 addition & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ lazy val lambda = crossProject(JSPlatform, JVMPlatform)
.jvmSettings(
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-lambda-java-core" % "1.2.3",
"co.fs2" %%% "fs2-io" % fs2Version,
"io.circe" %%% "circe-fs2" % "0.14.1"
"co.fs2" %%% "fs2-io" % fs2Version
)
)
.dependsOn(core)
Expand Down
46 changes: 17 additions & 29 deletions lambda/jvm/src/main/scala/feral/lambda/IOLambdaPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,53 +16,41 @@

package feral.lambda

import cats.data.OptionT
import cats.effect.IO
import cats.effect.kernel.Resource
import com.amazonaws.services.lambda.{runtime => lambdaRuntime}
import io.circe
import io.circe.Printer
import io.circe.jawn
import io.circe.syntax._

import java.io.InputStream
import java.io.OutputStream
import java.io.OutputStreamWriter
import java.nio.channels.Channels
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.concurrent.duration._

private[lambda] abstract class IOLambdaPlatform[Event, Result]
extends lambdaRuntime.RequestStreamHandler { this: IOLambda[Event, Result] =>

final def handleRequest(
input: InputStream,
output: OutputStream,
context: lambdaRuntime.Context): Unit = {
runtimeContext: lambdaRuntime.Context): Unit = {
val (dispatcher, lambda) =
Await.result(setupMemo, Duration.Inf)
Await.result(setupMemo, runtimeContext.getRemainingTimeInMillis().millis)

dispatcher.unsafeRunSync {
Resource
.eval {
for {
event <- fs2
.io
.readInputStream(IO.pure(input), 8192, closeAfterUse = false)
.through(circe.fs2.byteStreamParser)
.through(circe.fs2.decoder[IO, Event])
.head
.compile
.lastOrError
context <- IO(Context.fromJava[IO](context))
_ <- OptionT(lambda(event, context)).foreachF { result =>
Resource.fromAutoCloseable(IO(new OutputStreamWriter(output))).use { writer =>
IO.blocking(Printer.noSpaces.unsafePrintToAppendable(result.asJson, writer))
}
}
} yield ()
}
.onFinalize(IO.blocking(input.close()) &> IO.blocking(output.close()))
.use_
}
val event = jawn.decodeChannel[Event](Channels.newChannel(input)).fold(throw _, identity(_))
val context = Context.fromJava[IO](runtimeContext)
dispatcher
.unsafeRunTimed(
lambda(event, context),
runtimeContext.getRemainingTimeInMillis().millis
)
.foreach { result =>
val writer = new OutputStreamWriter(output)
Printer.noSpaces.unsafePrintToAppendable(result.asJson, writer)
writer.flush()
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class IOLambdaJvmSuite extends FunSuite {
override def getInvokedFunctionArn(): String = ""
override def getIdentity(): runtime.CognitoIdentity = null
override def getClientContext(): runtime.ClientContext = null
override def getRemainingTimeInMillis(): Int = 0
override def getRemainingTimeInMillis(): Int = Int.MaxValue
override def getMemoryLimitInMB(): Int = 0
override def getLogger(): runtime.LambdaLogger = null
}
Expand Down

0 comments on commit fa8c79a

Please sign in to comment.