diff --git a/build.sbt b/build.sbt index c8e13f47..1aa43028 100644 --- a/build.sbt +++ b/build.sbt @@ -122,8 +122,7 @@ lazy val lambda = crossProject(JSPlatform, JVMPlatform) .jvmSettings( libraryDependencies ++= Seq( "com.amazonaws" % "aws-lambda-java-core" % "1.2.3", - "co.fs2" %%% "fs2-io" % fs2Version, - "io.circe" %%% "circe-fs2" % "0.14.1" + "co.fs2" %%% "fs2-io" % fs2Version ) ) .dependsOn(core) diff --git a/lambda/jvm/src/main/scala/feral/lambda/IOLambdaPlatform.scala b/lambda/jvm/src/main/scala/feral/lambda/IOLambdaPlatform.scala index ccbb8a78..419d894d 100644 --- a/lambda/jvm/src/main/scala/feral/lambda/IOLambdaPlatform.scala +++ b/lambda/jvm/src/main/scala/feral/lambda/IOLambdaPlatform.scala @@ -16,19 +16,18 @@ package feral.lambda -import cats.data.OptionT import cats.effect.IO -import cats.effect.kernel.Resource import com.amazonaws.services.lambda.{runtime => lambdaRuntime} -import io.circe import io.circe.Printer +import io.circe.jawn import io.circe.syntax._ import java.io.InputStream import java.io.OutputStream import java.io.OutputStreamWriter +import java.nio.channels.Channels import scala.concurrent.Await -import scala.concurrent.duration.Duration +import scala.concurrent.duration._ private[lambda] abstract class IOLambdaPlatform[Event, Result] extends lambdaRuntime.RequestStreamHandler { this: IOLambda[Event, Result] => @@ -36,33 +35,22 @@ private[lambda] abstract class IOLambdaPlatform[Event, Result] final def handleRequest( input: InputStream, output: OutputStream, - context: lambdaRuntime.Context): Unit = { + runtimeContext: lambdaRuntime.Context): Unit = { val (dispatcher, lambda) = - Await.result(setupMemo, Duration.Inf) + Await.result(setupMemo, runtimeContext.getRemainingTimeInMillis().millis) - dispatcher.unsafeRunSync { - Resource - .eval { - for { - event <- fs2 - .io - .readInputStream(IO.pure(input), 8192, closeAfterUse = false) - .through(circe.fs2.byteStreamParser) - .through(circe.fs2.decoder[IO, Event]) - .head - .compile - .lastOrError - context <- IO(Context.fromJava[IO](context)) - _ <- OptionT(lambda(event, context)).foreachF { result => - Resource.fromAutoCloseable(IO(new OutputStreamWriter(output))).use { writer => - IO.blocking(Printer.noSpaces.unsafePrintToAppendable(result.asJson, writer)) - } - } - } yield () - } - .onFinalize(IO.blocking(input.close()) &> IO.blocking(output.close())) - .use_ - } + val event = jawn.decodeChannel[Event](Channels.newChannel(input)).fold(throw _, identity(_)) + val context = Context.fromJava[IO](runtimeContext) + dispatcher + .unsafeRunTimed( + lambda(event, context), + runtimeContext.getRemainingTimeInMillis().millis + ) + .foreach { result => + val writer = new OutputStreamWriter(output) + Printer.noSpaces.unsafePrintToAppendable(result.asJson, writer) + writer.flush() + } } } diff --git a/lambda/jvm/src/test/scala/feral/lambda/IOLambdaJvmSuite.scala b/lambda/jvm/src/test/scala/feral/lambda/IOLambdaJvmSuite.scala index fcb2cfb9..ceda72e9 100644 --- a/lambda/jvm/src/test/scala/feral/lambda/IOLambdaJvmSuite.scala +++ b/lambda/jvm/src/test/scala/feral/lambda/IOLambdaJvmSuite.scala @@ -62,7 +62,7 @@ class IOLambdaJvmSuite extends FunSuite { override def getInvokedFunctionArn(): String = "" override def getIdentity(): runtime.CognitoIdentity = null override def getClientContext(): runtime.ClientContext = null - override def getRemainingTimeInMillis(): Int = 0 + override def getRemainingTimeInMillis(): Int = Int.MaxValue override def getMemoryLimitInMB(): Int = 0 override def getLogger(): runtime.LambdaLogger = null }